Merge pull request #970 from shreyamalviya/telemetry-tests

Telemetry unit tests
This commit is contained in:
Mike Salvatore 2021-02-18 14:18:05 -05:00 committed by GitHub
commit 978927c329
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 490 additions and 29 deletions

View File

@ -3,6 +3,7 @@ from infection_monkey.telemetry.attack.usage_telem import AttackTelem
class T1064Telem(AttackTelem): class T1064Telem(AttackTelem):
def __init__(self, status, usage): def __init__(self, status, usage):
# TODO: rename parameter "usage" to avoid confusion with parameter "usage" in UsageTelem techniques
""" """
T1064 telemetry. T1064 telemetry.
:param status: ScanStatus of technique :param status: ScanStatus of technique

View File

@ -5,6 +5,7 @@ __author__ = "itay.mizeretz"
class T1197Telem(VictimHostTelem): class T1197Telem(VictimHostTelem):
def __init__(self, status, machine, usage): def __init__(self, status, machine, usage):
# TODO: rename parameter "usage" to avoid confusion with parameter "usage" in UsageTelem techniques
""" """
T1197 telemetry. T1197 telemetry.
:param status: ScanStatus of technique :param status: ScanStatus of technique

View File

@ -1,29 +0,0 @@
from unittest import TestCase
from common.utils.attack_utils import ScanStatus
from infection_monkey.model import VictimHost
from infection_monkey.telemetry.attack.victim_host_telem import VictimHostTelem
class TestVictimHostTelem(TestCase):
def test_get_data(self):
machine = VictimHost('127.0.0.1')
status = ScanStatus.USED
technique = 'T1210'
telem = VictimHostTelem(technique, status, machine)
self.assertEqual(telem.telem_category, 'attack')
expected_data = {
'machine': {
'domain_name': machine.domain_name,
'ip_addr': machine.ip_addr
},
'status': status.value,
'technique': technique
}
actual_data = telem.get_data()
self.assertEqual(actual_data, expected_data)

View File

@ -0,0 +1,20 @@
import pytest
from common.utils.attack_utils import ScanStatus
from infection_monkey.telemetry.attack.attack_telem import AttackTelem
STATUS = ScanStatus.USED
TECHNIQUE = "T9999"
@pytest.fixture
def attack_telem_test_instance():
return AttackTelem(TECHNIQUE, STATUS)
def test_attack_telem_send(attack_telem_test_instance, spy_send_telemetry):
attack_telem_test_instance.send()
expected_data = {"status": STATUS.value, "technique": TECHNIQUE}
assert spy_send_telemetry.data == expected_data
assert spy_send_telemetry.telem_category == "attack"

View File

@ -0,0 +1,26 @@
import pytest
from common.utils.attack_utils import ScanStatus
from infection_monkey.telemetry.attack.t1005_telem import T1005Telem
GATHERED_DATA_TYPE = "[Type of data collected]"
INFO = "[Additional info]"
STATUS = ScanStatus.USED
@pytest.fixture
def T1005_telem_test_instance():
return T1005Telem(STATUS, GATHERED_DATA_TYPE, INFO)
def test_T1005_send(T1005_telem_test_instance, spy_send_telemetry):
T1005_telem_test_instance.send()
expected_data = {
"status": STATUS.value,
"technique": "T1005",
"gathered_data_type": GATHERED_DATA_TYPE,
"info": INFO,
}
assert spy_send_telemetry.data == expected_data
assert spy_send_telemetry.telem_category == "attack"

View File

@ -0,0 +1,20 @@
import pytest
from common.utils.attack_utils import ScanStatus, UsageEnum
from infection_monkey.telemetry.attack.t1035_telem import T1035Telem
STATUS = ScanStatus.USED
USAGE = UsageEnum.SMB
@pytest.fixture
def T1035_telem_test_instance():
return T1035Telem(STATUS, USAGE)
def test_T1035_send(T1035_telem_test_instance, spy_send_telemetry):
T1035_telem_test_instance.send()
expected_data = {"status": STATUS.value, "technique": "T1035", "usage": USAGE.name}
assert spy_send_telemetry.data == expected_data
assert spy_send_telemetry.telem_category == "attack"

View File

@ -0,0 +1,20 @@
import pytest
from common.utils.attack_utils import ScanStatus
from infection_monkey.telemetry.attack.t1064_telem import T1064Telem
STATUS = ScanStatus.USED
USAGE_STR = "[Usage info]"
@pytest.fixture
def T1064_telem_test_instance():
return T1064Telem(STATUS, USAGE_STR)
def test_T1064_send(T1064_telem_test_instance, spy_send_telemetry):
T1064_telem_test_instance.send()
expected_data = {"status": STATUS.value, "technique": "T1064", "usage": USAGE_STR}
assert spy_send_telemetry.data == expected_data
assert spy_send_telemetry.telem_category == "attack"

View File

@ -0,0 +1,28 @@
import pytest
from common.utils.attack_utils import ScanStatus
from infection_monkey.telemetry.attack.t1105_telem import T1105Telem
DST_IP = "0.0.0.1"
FILENAME = "virus.exe"
SRC_IP = "0.0.0.0"
STATUS = ScanStatus.USED
@pytest.fixture
def T1105_telem_test_instance():
return T1105Telem(STATUS, SRC_IP, DST_IP, FILENAME)
def test_T1105_send(T1105_telem_test_instance, spy_send_telemetry):
T1105_telem_test_instance.send()
expected_data = {
"status": STATUS.value,
"technique": "T1105",
"filename": FILENAME,
"src": SRC_IP,
"dst": DST_IP,
}
assert spy_send_telemetry.data == expected_data
assert spy_send_telemetry.telem_category == "attack"

View File

@ -0,0 +1,20 @@
import pytest
from common.utils.attack_utils import ScanStatus, UsageEnum
from infection_monkey.telemetry.attack.t1106_telem import T1106Telem
STATUS = ScanStatus.USED
USAGE = UsageEnum.SMB
@pytest.fixture
def T1106_telem_test_instance():
return T1106Telem(STATUS, USAGE)
def test_T1106_send(T1106_telem_test_instance, spy_send_telemetry):
T1106_telem_test_instance.send()
expected_data = {"status": STATUS.value, "technique": "T1106", "usage": USAGE.name}
assert spy_send_telemetry.data == expected_data
assert spy_send_telemetry.telem_category == "attack"

View File

@ -0,0 +1,20 @@
import pytest
from common.utils.attack_utils import ScanStatus
from infection_monkey.telemetry.attack.t1107_telem import T1107Telem
PATH = "path/to/file.txt"
STATUS = ScanStatus.USED
@pytest.fixture
def T1107_telem_test_instance():
return T1107Telem(STATUS, PATH)
def test_T1107_send(T1107_telem_test_instance, spy_send_telemetry):
T1107_telem_test_instance.send()
expected_data = {"status": STATUS.value, "technique": "T1107", "path": PATH}
assert spy_send_telemetry.data == expected_data
assert spy_send_telemetry.telem_category == "attack"

View File

@ -0,0 +1,20 @@
import pytest
from common.utils.attack_utils import ScanStatus, UsageEnum
from infection_monkey.telemetry.attack.t1129_telem import T1129Telem
STATUS = ScanStatus.USED
USAGE = UsageEnum.SMB
@pytest.fixture
def T1129_telem_test_instance():
return T1129Telem(STATUS, USAGE)
def test_T1129_send(T1129_telem_test_instance, spy_send_telemetry):
T1129_telem_test_instance.send()
expected_data = {"status": STATUS.value, "technique": "T1129", "usage": USAGE.name}
assert spy_send_telemetry.data == expected_data
assert spy_send_telemetry.telem_category == "attack"

View File

@ -0,0 +1,29 @@
import pytest
from common.utils.attack_utils import ScanStatus
from infection_monkey.model import VictimHost
from infection_monkey.telemetry.attack.t1197_telem import T1197Telem
DOMAIN_NAME = "domain-name"
IP = "127.0.0.1"
MACHINE = VictimHost(IP, DOMAIN_NAME)
STATUS = ScanStatus.USED
USAGE_STR = "[Usage info]"
@pytest.fixture
def T1197_telem_test_instance():
return T1197Telem(STATUS, MACHINE, USAGE_STR)
def test_T1197_send(T1197_telem_test_instance, spy_send_telemetry):
T1197_telem_test_instance.send()
expected_data = {
"machine": {"domain_name": DOMAIN_NAME, "ip_addr": IP},
"status": STATUS.value,
"technique": "T1197",
"usage": USAGE_STR,
}
assert spy_send_telemetry.data == expected_data
assert spy_send_telemetry.telem_category == "attack"

View File

@ -0,0 +1,29 @@
import pytest
from common.utils.attack_utils import ScanStatus
from infection_monkey.model import VictimHost
from infection_monkey.telemetry.attack.t1222_telem import T1222Telem
COMMAND = "echo hi"
DOMAIN_NAME = "domain-name"
IP = "127.0.0.1"
MACHINE = VictimHost(IP, DOMAIN_NAME)
STATUS = ScanStatus.USED
@pytest.fixture
def T1222_telem_test_instance():
return T1222Telem(STATUS, COMMAND, MACHINE)
def test_T1222_send(T1222_telem_test_instance, spy_send_telemetry):
T1222_telem_test_instance.send()
expected_data = {
"machine": {"domain_name": DOMAIN_NAME, "ip_addr": IP},
"status": STATUS.value,
"technique": "T1222",
"command": COMMAND,
}
assert spy_send_telemetry.data == expected_data
assert spy_send_telemetry.telem_category == "attack"

View File

@ -0,0 +1,25 @@
import pytest
from common.utils.attack_utils import ScanStatus, UsageEnum
from infection_monkey.telemetry.attack.usage_telem import UsageTelem
STATUS = ScanStatus.USED
TECHNIQUE = "T9999"
USAGE = UsageEnum.SMB
@pytest.fixture
def usage_telem_test_instance():
return UsageTelem(TECHNIQUE, STATUS, USAGE)
def test_usage_telem_send(usage_telem_test_instance, spy_send_telemetry):
usage_telem_test_instance.send()
expected_data = {
"status": STATUS.value,
"technique": TECHNIQUE,
"usage": USAGE.name,
}
assert spy_send_telemetry.data == expected_data
assert spy_send_telemetry.telem_category == "attack"

View File

@ -0,0 +1,28 @@
import pytest
from common.utils.attack_utils import ScanStatus, UsageEnum
from infection_monkey.model import VictimHost
from infection_monkey.telemetry.attack.victim_host_telem import VictimHostTelem
DOMAIN_NAME = "domain-name"
IP = "127.0.0.1"
MACHINE = VictimHost(IP, DOMAIN_NAME)
STATUS = ScanStatus.USED
TECHNIQUE = "T9999"
@pytest.fixture
def victim_host_telem_test_instance():
return VictimHostTelem(TECHNIQUE, STATUS, MACHINE)
def test_victim_host_telem_send(victim_host_telem_test_instance, spy_send_telemetry):
victim_host_telem_test_instance.send()
expected_data = {
"machine": {"domain_name": DOMAIN_NAME, "ip_addr": IP},
"status": STATUS.value,
"technique": TECHNIQUE,
}
assert spy_send_telemetry.data == expected_data
assert spy_send_telemetry.telem_category == "attack"

View File

@ -0,0 +1,15 @@
import pytest
from infection_monkey.control import ControlClient
@pytest.fixture
def spy_send_telemetry(monkeypatch):
def _spy_send_telemetry(telem_category, data):
_spy_send_telemetry.telem_category = telem_category
_spy_send_telemetry.data = data
_spy_send_telemetry.telem_category = None
_spy_send_telemetry.data = None
monkeypatch.setattr(ControlClient, "send_telemetry", _spy_send_telemetry)
return _spy_send_telemetry

View File

@ -0,0 +1,50 @@
import pytest
from infection_monkey.exploit.wmiexec import WmiExploiter
from infection_monkey.model.host import VictimHost
from infection_monkey.telemetry.exploit_telem import ExploitTelem
DOMAIN_NAME = "domain-name"
IP = "0.0.0.0"
HOST = VictimHost(IP, DOMAIN_NAME)
HOST_AS_DICT = {
"ip_addr": IP,
"domain_name": DOMAIN_NAME,
"os": {},
"services": {},
"icmp": False,
"monkey_exe": None,
"default_tunnel": None,
"default_server": None,
}
EXPLOITER = WmiExploiter(HOST)
EXPLOITER_NAME = "WmiExploiter"
EXPLOITER_INFO = {
"display_name": WmiExploiter._EXPLOITED_SERVICE,
"started": "",
"finished": "",
"vulnerable_urls": [],
"vulnerable_ports": [],
"executed_cmds": [],
}
EXPLOITER_ATTEMPTS = []
RESULT = False
@pytest.fixture
def exploit_telem_test_instance():
return ExploitTelem(EXPLOITER, RESULT)
def test_exploit_telem_send(exploit_telem_test_instance, spy_send_telemetry):
exploit_telem_test_instance.send()
expected_data = {
"result": RESULT,
"machine": HOST_AS_DICT,
"exploiter": EXPLOITER_NAME,
"info": EXPLOITER_INFO,
"attempts": EXPLOITER_ATTEMPTS,
}
assert spy_send_telemetry.data == expected_data
assert spy_send_telemetry.telem_category == "exploit"

View File

@ -0,0 +1,36 @@
import pytest
from infection_monkey.telemetry.post_breach_telem import PostBreachTelem
HOSTNAME = "hostname"
IP = "0.0.0.0"
PBA_COMMAND = "run some pba"
PBA_NAME = "some pba"
RESULT = False
class StubSomePBA:
def __init__(self):
self.name = PBA_NAME
self.command = PBA_COMMAND
@pytest.fixture
def post_breach_telem_test_instance(monkeypatch):
PBA = StubSomePBA()
monkeypatch.setattr(PostBreachTelem, "_get_hostname_and_ip", lambda: (HOSTNAME, IP))
return PostBreachTelem(PBA, RESULT)
def test_post_breach_telem_send(post_breach_telem_test_instance, spy_send_telemetry):
post_breach_telem_test_instance.send()
expected_data = {
"command": PBA_COMMAND,
"result": RESULT,
"name": PBA_NAME,
"hostname": HOSTNAME,
"ip": IP,
}
assert spy_send_telemetry.data == expected_data
assert spy_send_telemetry.telem_category == "post_breach"

View File

@ -0,0 +1,32 @@
import pytest
from infection_monkey.telemetry.scan_telem import ScanTelem
from infection_monkey.model.host import VictimHost
DOMAIN_NAME = "domain-name"
IP = "0.0.0.0"
HOST = VictimHost(IP, DOMAIN_NAME)
HOST_AS_DICT = {
"ip_addr": IP,
"domain_name": DOMAIN_NAME,
"os": {},
"services": {},
"icmp": False,
"monkey_exe": None,
"default_tunnel": None,
"default_server": None,
}
HOST_SERVICES = {}
@pytest.fixture
def scan_telem_test_instance():
return ScanTelem(HOST)
def test_scan_telem_send(scan_telem_test_instance, spy_send_telemetry):
scan_telem_test_instance.send()
expected_data = {"machine": HOST_AS_DICT, "service_count": len(HOST_SERVICES)}
assert spy_send_telemetry.data == expected_data
assert spy_send_telemetry.telem_category == "scan"

View File

@ -0,0 +1,19 @@
import pytest
from infection_monkey.telemetry.state_telem import StateTelem
IS_DONE = True
VERSION = "version"
@pytest.fixture
def state_telem_test_instance():
return StateTelem(IS_DONE, VERSION)
def test_state_telem_send(state_telem_test_instance, spy_send_telemetry):
state_telem_test_instance.send()
expected_data = {"done": IS_DONE, "version": VERSION}
assert spy_send_telemetry.data == expected_data
assert spy_send_telemetry.telem_category == "state"

View File

@ -0,0 +1,18 @@
import pytest
from infection_monkey.telemetry.system_info_telem import SystemInfoTelem
SYSTEM_INFO = {}
@pytest.fixture
def system_info_telem_test_instance():
return SystemInfoTelem(SYSTEM_INFO)
def test_system_info_telem_send(system_info_telem_test_instance, spy_send_telemetry):
system_info_telem_test_instance.send()
expected_data = SYSTEM_INFO
assert spy_send_telemetry.data == expected_data
assert spy_send_telemetry.telem_category == "system_info"

View File

@ -0,0 +1,18 @@
import pytest
from infection_monkey.telemetry.trace_telem import TraceTelem
MSG = "message"
@pytest.fixture
def trace_telem_test_instance():
return TraceTelem(MSG)
def test_trace_telem_send(trace_telem_test_instance, spy_send_telemetry):
trace_telem_test_instance.send()
expected_data = {"msg": MSG}
assert spy_send_telemetry.data == expected_data
assert spy_send_telemetry.telem_category == "trace"

View File

@ -0,0 +1,15 @@
import pytest
from infection_monkey.telemetry.tunnel_telem import TunnelTelem
@pytest.fixture
def tunnel_telem_test_instance():
return TunnelTelem()
def test_tunnel_telem_send(tunnel_telem_test_instance, spy_send_telemetry):
tunnel_telem_test_instance.send()
expected_data = {"proxy": None}
assert spy_send_telemetry.data == expected_data
assert spy_send_telemetry.telem_category == "tunnel"