Refactored telemetry unit tests to json encode data the same way telemetries do.

This commit is contained in:
VakarisZ 2021-02-19 17:15:49 +02:00
parent c698e0ab66
commit 4158ed802b
19 changed files with 65 additions and 3 deletions

View File

@ -1,3 +1,5 @@
import json
import pytest import pytest
from common.utils.attack_utils import ScanStatus from common.utils.attack_utils import ScanStatus
@ -16,5 +18,7 @@ def attack_telem_test_instance():
def test_attack_telem_send(attack_telem_test_instance, spy_send_telemetry): def test_attack_telem_send(attack_telem_test_instance, spy_send_telemetry):
attack_telem_test_instance.send() attack_telem_test_instance.send()
expected_data = {"status": STATUS.value, "technique": TECHNIQUE} expected_data = {"status": STATUS.value, "technique": TECHNIQUE}
expected_data = json.dumps(expected_data, cls=attack_telem_test_instance.json_encoder)
assert spy_send_telemetry.data == expected_data assert spy_send_telemetry.data == expected_data
assert spy_send_telemetry.telem_category == "attack" assert spy_send_telemetry.telem_category == "attack"

View File

@ -1,3 +1,5 @@
import json
import pytest import pytest
from common.utils.attack_utils import ScanStatus from common.utils.attack_utils import ScanStatus
@ -22,5 +24,6 @@ def test_T1005_send(T1005_telem_test_instance, spy_send_telemetry):
"gathered_data_type": GATHERED_DATA_TYPE, "gathered_data_type": GATHERED_DATA_TYPE,
"info": INFO, "info": INFO,
} }
expected_data = json.dumps(expected_data, cls=T1005_telem_test_instance.json_encoder)
assert spy_send_telemetry.data == expected_data assert spy_send_telemetry.data == expected_data
assert spy_send_telemetry.telem_category == "attack" assert spy_send_telemetry.telem_category == "attack"

View File

@ -1,3 +1,5 @@
import json
import pytest import pytest
from common.utils.attack_utils import ScanStatus, UsageEnum from common.utils.attack_utils import ScanStatus, UsageEnum
@ -16,5 +18,6 @@ def T1035_telem_test_instance():
def test_T1035_send(T1035_telem_test_instance, spy_send_telemetry): def test_T1035_send(T1035_telem_test_instance, spy_send_telemetry):
T1035_telem_test_instance.send() T1035_telem_test_instance.send()
expected_data = {"status": STATUS.value, "technique": "T1035", "usage": USAGE.name} expected_data = {"status": STATUS.value, "technique": "T1035", "usage": USAGE.name}
expected_data = json.dumps(expected_data, cls=T1035_telem_test_instance.json_encoder)
assert spy_send_telemetry.data == expected_data assert spy_send_telemetry.data == expected_data
assert spy_send_telemetry.telem_category == "attack" assert spy_send_telemetry.telem_category == "attack"

View File

@ -1,3 +1,5 @@
import json
import pytest import pytest
from common.utils.attack_utils import ScanStatus from common.utils.attack_utils import ScanStatus
@ -16,5 +18,6 @@ def T1064_telem_test_instance():
def test_T1064_send(T1064_telem_test_instance, spy_send_telemetry): def test_T1064_send(T1064_telem_test_instance, spy_send_telemetry):
T1064_telem_test_instance.send() T1064_telem_test_instance.send()
expected_data = {"status": STATUS.value, "technique": "T1064", "usage": USAGE_STR} expected_data = {"status": STATUS.value, "technique": "T1064", "usage": USAGE_STR}
expected_data = json.dumps(expected_data, cls=T1064_telem_test_instance.json_encoder)
assert spy_send_telemetry.data == expected_data assert spy_send_telemetry.data == expected_data
assert spy_send_telemetry.telem_category == "attack" assert spy_send_telemetry.telem_category == "attack"

View File

@ -1,3 +1,5 @@
import json
import pytest import pytest
from common.utils.attack_utils import ScanStatus from common.utils.attack_utils import ScanStatus
@ -24,5 +26,6 @@ def test_T1105_send(T1105_telem_test_instance, spy_send_telemetry):
"src": SRC_IP, "src": SRC_IP,
"dst": DST_IP, "dst": DST_IP,
} }
expected_data = json.dumps(expected_data, cls=T1105_telem_test_instance.json_encoder)
assert spy_send_telemetry.data == expected_data assert spy_send_telemetry.data == expected_data
assert spy_send_telemetry.telem_category == "attack" assert spy_send_telemetry.telem_category == "attack"

View File

@ -1,3 +1,5 @@
import json
import pytest import pytest
from common.utils.attack_utils import ScanStatus, UsageEnum from common.utils.attack_utils import ScanStatus, UsageEnum
@ -16,5 +18,6 @@ def T1106_telem_test_instance():
def test_T1106_send(T1106_telem_test_instance, spy_send_telemetry): def test_T1106_send(T1106_telem_test_instance, spy_send_telemetry):
T1106_telem_test_instance.send() T1106_telem_test_instance.send()
expected_data = {"status": STATUS.value, "technique": "T1106", "usage": USAGE.name} expected_data = {"status": STATUS.value, "technique": "T1106", "usage": USAGE.name}
expected_data = json.dumps(expected_data, cls=T1106_telem_test_instance.json_encoder)
assert spy_send_telemetry.data == expected_data assert spy_send_telemetry.data == expected_data
assert spy_send_telemetry.telem_category == "attack" assert spy_send_telemetry.telem_category == "attack"

View File

@ -1,3 +1,5 @@
import json
import pytest import pytest
from common.utils.attack_utils import ScanStatus from common.utils.attack_utils import ScanStatus
@ -16,5 +18,6 @@ def T1107_telem_test_instance():
def test_T1107_send(T1107_telem_test_instance, spy_send_telemetry): def test_T1107_send(T1107_telem_test_instance, spy_send_telemetry):
T1107_telem_test_instance.send() T1107_telem_test_instance.send()
expected_data = {"status": STATUS.value, "technique": "T1107", "path": PATH} expected_data = {"status": STATUS.value, "technique": "T1107", "path": PATH}
expected_data = json.dumps(expected_data, cls=T1107_telem_test_instance.json_encoder)
assert spy_send_telemetry.data == expected_data assert spy_send_telemetry.data == expected_data
assert spy_send_telemetry.telem_category == "attack" assert spy_send_telemetry.telem_category == "attack"

View File

@ -1,3 +1,5 @@
import json
import pytest import pytest
from common.utils.attack_utils import ScanStatus, UsageEnum from common.utils.attack_utils import ScanStatus, UsageEnum
@ -16,5 +18,6 @@ def T1129_telem_test_instance():
def test_T1129_send(T1129_telem_test_instance, spy_send_telemetry): def test_T1129_send(T1129_telem_test_instance, spy_send_telemetry):
T1129_telem_test_instance.send() T1129_telem_test_instance.send()
expected_data = {"status": STATUS.value, "technique": "T1129", "usage": USAGE.name} expected_data = {"status": STATUS.value, "technique": "T1129", "usage": USAGE.name}
expected_data = json.dumps(expected_data, cls=T1129_telem_test_instance.json_encoder)
assert spy_send_telemetry.data == expected_data assert spy_send_telemetry.data == expected_data
assert spy_send_telemetry.telem_category == "attack" assert spy_send_telemetry.telem_category == "attack"

View File

@ -1,3 +1,5 @@
import json
import pytest import pytest
from common.utils.attack_utils import ScanStatus from common.utils.attack_utils import ScanStatus
@ -20,10 +22,11 @@ def T1197_telem_test_instance():
def test_T1197_send(T1197_telem_test_instance, spy_send_telemetry): def test_T1197_send(T1197_telem_test_instance, spy_send_telemetry):
T1197_telem_test_instance.send() T1197_telem_test_instance.send()
expected_data = { expected_data = {
"machine": {"domain_name": DOMAIN_NAME, "ip_addr": IP},
"status": STATUS.value, "status": STATUS.value,
"technique": "T1197", "technique": "T1197",
"machine": {"domain_name": DOMAIN_NAME, "ip_addr": IP},
"usage": USAGE_STR, "usage": USAGE_STR,
} }
expected_data = json.dumps(expected_data, cls=T1197_telem_test_instance.json_encoder)
assert spy_send_telemetry.data == expected_data assert spy_send_telemetry.data == expected_data
assert spy_send_telemetry.telem_category == "attack" assert spy_send_telemetry.telem_category == "attack"

View File

@ -1,3 +1,5 @@
import json
import pytest import pytest
from common.utils.attack_utils import ScanStatus from common.utils.attack_utils import ScanStatus
@ -20,10 +22,11 @@ def T1222_telem_test_instance():
def test_T1222_send(T1222_telem_test_instance, spy_send_telemetry): def test_T1222_send(T1222_telem_test_instance, spy_send_telemetry):
T1222_telem_test_instance.send() T1222_telem_test_instance.send()
expected_data = { expected_data = {
"machine": {"domain_name": DOMAIN_NAME, "ip_addr": IP},
"status": STATUS.value, "status": STATUS.value,
"technique": "T1222", "technique": "T1222",
"machine": {"domain_name": DOMAIN_NAME, "ip_addr": IP},
"command": COMMAND, "command": COMMAND,
} }
expected_data = json.dumps(expected_data, cls=T1222_telem_test_instance.json_encoder)
assert spy_send_telemetry.data == expected_data assert spy_send_telemetry.data == expected_data
assert spy_send_telemetry.telem_category == "attack" assert spy_send_telemetry.telem_category == "attack"

View File

@ -1,3 +1,5 @@
import json
import pytest import pytest
from common.utils.attack_utils import ScanStatus, UsageEnum from common.utils.attack_utils import ScanStatus, UsageEnum
@ -21,5 +23,6 @@ def test_usage_telem_send(usage_telem_test_instance, spy_send_telemetry):
"technique": TECHNIQUE, "technique": TECHNIQUE,
"usage": USAGE.name, "usage": USAGE.name,
} }
expected_data = json.dumps(expected_data, cls=usage_telem_test_instance.json_encoder)
assert spy_send_telemetry.data == expected_data assert spy_send_telemetry.data == expected_data
assert spy_send_telemetry.telem_category == "attack" assert spy_send_telemetry.telem_category == "attack"

View File

@ -1,3 +1,5 @@
import json
import pytest import pytest
from common.utils.attack_utils import ScanStatus, UsageEnum from common.utils.attack_utils import ScanStatus, UsageEnum
@ -20,9 +22,10 @@ def victim_host_telem_test_instance():
def test_victim_host_telem_send(victim_host_telem_test_instance, spy_send_telemetry): def test_victim_host_telem_send(victim_host_telem_test_instance, spy_send_telemetry):
victim_host_telem_test_instance.send() victim_host_telem_test_instance.send()
expected_data = { expected_data = {
"machine": {"domain_name": DOMAIN_NAME, "ip_addr": IP},
"status": STATUS.value, "status": STATUS.value,
"technique": TECHNIQUE, "technique": TECHNIQUE,
"machine": {"domain_name": DOMAIN_NAME, "ip_addr": IP}
} }
expected_data = json.dumps(expected_data, cls=victim_host_telem_test_instance.json_encoder)
assert spy_send_telemetry.data == expected_data assert spy_send_telemetry.data == expected_data
assert spy_send_telemetry.telem_category == "attack" assert spy_send_telemetry.telem_category == "attack"

View File

@ -1,3 +1,5 @@
import json
import pytest import pytest
from infection_monkey.exploit.wmiexec import WmiExploiter from infection_monkey.exploit.wmiexec import WmiExploiter
@ -46,5 +48,6 @@ def test_exploit_telem_send(exploit_telem_test_instance, spy_send_telemetry):
"info": EXPLOITER_INFO, "info": EXPLOITER_INFO,
"attempts": EXPLOITER_ATTEMPTS, "attempts": EXPLOITER_ATTEMPTS,
} }
expected_data = json.dumps(expected_data, cls=exploit_telem_test_instance.json_encoder)
assert spy_send_telemetry.data == expected_data assert spy_send_telemetry.data == expected_data
assert spy_send_telemetry.telem_category == "exploit" assert spy_send_telemetry.telem_category == "exploit"

View File

@ -1,3 +1,5 @@
import json
import pytest import pytest
from infection_monkey.telemetry.post_breach_telem import PostBreachTelem from infection_monkey.telemetry.post_breach_telem import PostBreachTelem
@ -32,5 +34,6 @@ def test_post_breach_telem_send(post_breach_telem_test_instance, spy_send_teleme
"hostname": HOSTNAME, "hostname": HOSTNAME,
"ip": IP, "ip": IP,
} }
expected_data = json.dumps(expected_data, cls=post_breach_telem_test_instance.json_encoder)
assert spy_send_telemetry.data == expected_data assert spy_send_telemetry.data == expected_data
assert spy_send_telemetry.telem_category == "post_breach" assert spy_send_telemetry.telem_category == "post_breach"

View File

@ -1,3 +1,5 @@
import json
import pytest import pytest
from infection_monkey.telemetry.scan_telem import ScanTelem from infection_monkey.telemetry.scan_telem import ScanTelem
@ -28,5 +30,7 @@ def scan_telem_test_instance():
def test_scan_telem_send(scan_telem_test_instance, spy_send_telemetry): def test_scan_telem_send(scan_telem_test_instance, spy_send_telemetry):
scan_telem_test_instance.send() scan_telem_test_instance.send()
expected_data = {"machine": HOST_AS_DICT, "service_count": len(HOST_SERVICES)} expected_data = {"machine": HOST_AS_DICT, "service_count": len(HOST_SERVICES)}
expected_data = json.dumps(expected_data, cls=scan_telem_test_instance.json_encoder)
assert spy_send_telemetry.data == expected_data assert spy_send_telemetry.data == expected_data
assert spy_send_telemetry.telem_category == "scan" assert spy_send_telemetry.telem_category == "scan"

View File

@ -1,3 +1,5 @@
import json
import pytest import pytest
from infection_monkey.telemetry.state_telem import StateTelem from infection_monkey.telemetry.state_telem import StateTelem
@ -15,5 +17,7 @@ def state_telem_test_instance():
def test_state_telem_send(state_telem_test_instance, spy_send_telemetry): def test_state_telem_send(state_telem_test_instance, spy_send_telemetry):
state_telem_test_instance.send() state_telem_test_instance.send()
expected_data = {"done": IS_DONE, "version": VERSION} expected_data = {"done": IS_DONE, "version": VERSION}
expected_data = json.dumps(expected_data, cls=state_telem_test_instance.json_encoder)
assert spy_send_telemetry.data == expected_data assert spy_send_telemetry.data == expected_data
assert spy_send_telemetry.telem_category == "state" assert spy_send_telemetry.telem_category == "state"

View File

@ -1,3 +1,5 @@
import json
import pytest import pytest
from infection_monkey.telemetry.system_info_telem import SystemInfoTelem from infection_monkey.telemetry.system_info_telem import SystemInfoTelem
@ -14,5 +16,6 @@ def system_info_telem_test_instance():
def test_system_info_telem_send(system_info_telem_test_instance, spy_send_telemetry): def test_system_info_telem_send(system_info_telem_test_instance, spy_send_telemetry):
system_info_telem_test_instance.send() system_info_telem_test_instance.send()
expected_data = SYSTEM_INFO expected_data = SYSTEM_INFO
expected_data = json.dumps(expected_data, cls=system_info_telem_test_instance.json_encoder)
assert spy_send_telemetry.data == expected_data assert spy_send_telemetry.data == expected_data
assert spy_send_telemetry.telem_category == "system_info" assert spy_send_telemetry.telem_category == "system_info"

View File

@ -1,3 +1,5 @@
import json
import pytest import pytest
from infection_monkey.telemetry.trace_telem import TraceTelem from infection_monkey.telemetry.trace_telem import TraceTelem
@ -14,5 +16,7 @@ def trace_telem_test_instance():
def test_trace_telem_send(trace_telem_test_instance, spy_send_telemetry): def test_trace_telem_send(trace_telem_test_instance, spy_send_telemetry):
trace_telem_test_instance.send() trace_telem_test_instance.send()
expected_data = {"msg": MSG} expected_data = {"msg": MSG}
expected_data = json.dumps(expected_data, cls=trace_telem_test_instance.json_encoder)
assert spy_send_telemetry.data == expected_data assert spy_send_telemetry.data == expected_data
assert spy_send_telemetry.telem_category == "trace" assert spy_send_telemetry.telem_category == "trace"

View File

@ -1,3 +1,5 @@
import json
import pytest import pytest
from infection_monkey.telemetry.tunnel_telem import TunnelTelem from infection_monkey.telemetry.tunnel_telem import TunnelTelem
@ -11,5 +13,7 @@ def tunnel_telem_test_instance():
def test_tunnel_telem_send(tunnel_telem_test_instance, spy_send_telemetry): def test_tunnel_telem_send(tunnel_telem_test_instance, spy_send_telemetry):
tunnel_telem_test_instance.send() tunnel_telem_test_instance.send()
expected_data = {"proxy": None} expected_data = {"proxy": None}
expected_data = json.dumps(expected_data, cls=tunnel_telem_test_instance.json_encoder)
assert spy_send_telemetry.data == expected_data assert spy_send_telemetry.data == expected_data
assert spy_send_telemetry.telem_category == "tunnel" assert spy_send_telemetry.telem_category == "tunnel"