Merge pull request #2379 from guardicore/2267-update-nodes-with-tcp-scan-events

2267 update nodes with tcp scan events
This commit is contained in:
Mike Salvatore 2022-10-01 19:16:54 -04:00 committed by GitHub
commit 25f12305f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 170 additions and 106 deletions

View File

@ -1,3 +1,3 @@
from .handle_ping_scan_event import handle_ping_scan_event
from .save_event_to_event_repository import save_event_to_event_repository
from .save_stolen_credentials_to_repository import save_stolen_credentials_to_repository
from .scan_event_handler import ScanEventHandler

View File

@ -1,8 +1,11 @@
from ipaddress import IPv4Interface
from logging import getLogger
from typing import Union
from pydantic.json import IPv4Interface
from typing_extensions import TypeAlias
from common.agent_events import PingScanEvent
from common.agent_events import PingScanEvent, TCPScanEvent
from common.types import PortStatus
from monkey_island.cc.models import CommunicationType, Machine
from monkey_island.cc.repository import (
IAgentRepository,
@ -13,12 +16,14 @@ from monkey_island.cc.repository import (
UnknownRecordError,
)
ScanEvent: TypeAlias = Union[PingScanEvent, TCPScanEvent]
logger = getLogger(__name__)
class handle_ping_scan_event:
class ScanEventHandler:
"""
Handles ping scan event and makes changes to Machine and Node states based on it
Handles scan event and makes changes to Machine and Node states based on it
"""
def __init__(
@ -31,10 +36,7 @@ class handle_ping_scan_event:
self._machine_repository = machine_repository
self._node_repository = node_repository
def __call__(self, event: PingScanEvent):
"""
:param event: Ping scan event to process
"""
def handle_ping_scan_event(self, event: PingScanEvent):
if not event.response_received:
return
@ -46,7 +48,20 @@ class handle_ping_scan_event:
except (RetrievalError, StorageError, UnknownRecordError):
logger.exception("Unable to process ping scan data")
def _get_target_machine(self, event: PingScanEvent) -> Machine:
def handle_tcp_scan_event(self, event: TCPScanEvent):
num_open_ports = sum((1 for status in event.ports.values() if status == PortStatus.OPEN))
if num_open_ports <= 0:
return
try:
target_machine = self._get_target_machine(event)
self._update_nodes(target_machine, event)
except (RetrievalError, StorageError, UnknownRecordError):
logger.exception("Unable to process tcp scan data")
def _get_target_machine(self, event: ScanEvent) -> Machine:
try:
target_machines = self._machine_repository.get_machines_by_ip(event.target)
return target_machines[0]
@ -63,14 +78,13 @@ class handle_ping_scan_event:
machine.operating_system = event.os
self._machine_repository.upsert_machine(machine)
def _update_nodes(self, target_machine, event):
def _update_nodes(self, target_machine: Machine, event: ScanEvent):
src_machine = self._get_source_machine(event)
# Update or create the node
self._node_repository.upsert_communication(
src_machine.id, target_machine.id, CommunicationType.SCANNED
)
def _get_source_machine(self, event: PingScanEvent) -> Machine:
def _get_source_machine(self, event: ScanEvent) -> Machine:
agent = self._agent_repository.get_agent_by_id(event.source)
return self._machine_repository.get_machine_by_id(agent.machine_id)

View File

@ -1,28 +1,23 @@
from common import DIContainer
from common.agent_events import CredentialsStolenEvent, PingScanEvent
from common.agent_events import CredentialsStolenEvent, PingScanEvent, TCPScanEvent
from common.event_queue import IAgentEventQueue
from monkey_island.cc.agent_event_handlers import (
handle_ping_scan_event,
ScanEventHandler,
save_event_to_event_repository,
save_stolen_credentials_to_repository,
)
from monkey_island.cc.repository import (
IAgentEventRepository,
IAgentRepository,
ICredentialsRepository,
IMachineRepository,
INodeRepository,
)
from monkey_island.cc.repository import IAgentEventRepository, ICredentialsRepository
def setup_agent_event_handlers(container: DIContainer):
_subscribe_and_store_to_event_repository(container)
_subscribe_ping_scan_event(container)
_subscribe_scan_events(container)
def _subscribe_and_store_to_event_repository(container: DIContainer):
agent_event_queue = container.resolve(IAgentEventQueue)
# TODO: Can't we just `container.resolve(save_event_to_event_repository)`?
save_event_subscriber = save_event_to_event_repository(container.resolve(IAgentEventRepository))
agent_event_queue.subscribe_all_events(save_event_subscriber)
@ -32,12 +27,9 @@ def _subscribe_and_store_to_event_repository(container: DIContainer):
agent_event_queue.subscribe_type(CredentialsStolenEvent, save_stolen_credentials_subscriber)
def _subscribe_ping_scan_event(container: DIContainer):
def _subscribe_scan_events(container: DIContainer):
agent_event_queue = container.resolve(IAgentEventQueue)
agent_repository = container.resolve(IAgentRepository)
machine_repository = container.resolve(IMachineRepository)
node_repository = container.resolve(INodeRepository)
scan_event_handler = container.resolve(ScanEventHandler)
handler = handle_ping_scan_event(agent_repository, machine_repository, node_repository)
agent_event_queue.subscribe_type(PingScanEvent, handler)
agent_event_queue.subscribe_type(PingScanEvent, scan_event_handler.handle_ping_scan_event)
agent_event_queue.subscribe_type(TCPScanEvent, scan_event_handler.handle_tcp_scan_event)

View File

@ -6,9 +6,9 @@ from uuid import UUID
import pytest
from common import OperatingSystem
from common.agent_events import PingScanEvent
from common.types import SocketAddress
from monkey_island.cc.agent_event_handlers import handle_ping_scan_event
from common.agent_events import PingScanEvent, TCPScanEvent
from common.types import PortStatus, SocketAddress
from monkey_island.cc.agent_event_handlers import ScanEventHandler
from monkey_island.cc.models import Agent, CommunicationType, Machine
from monkey_island.cc.repository import (
IAgentRepository,
@ -24,7 +24,7 @@ AGENT_ID = UUID("1d8ce743-a0f4-45c5-96af-91106529d3e2")
MACHINE_ID = 11
CC_SERVER = SocketAddress(ip="10.10.10.100", port="5000")
AGENT = Agent(id=AGENT_ID, machine_id=MACHINE_ID, start_time=0, parent_id=None, cc_server=CC_SERVER)
PINGER_MACHINE = Machine(
SOURCE_MACHINE = Machine(
id=MACHINE_ID,
hardware_id=5,
network_interfaces=[IPv4Interface("10.10.10.99/24")],
@ -34,20 +34,41 @@ TARGET_MACHINE = Machine(
hardware_id=9,
network_interfaces=[IPv4Interface("10.10.10.1/24")],
)
EVENT = PingScanEvent(
PING_SCAN_EVENT = PingScanEvent(
source=AGENT_ID,
target=IPv4Address("10.10.10.1"),
response_received=True,
os=OperatingSystem.LINUX,
)
EVENT_NO_RESPONSE = PingScanEvent(
PING_SCAN_EVENT_NO_RESPONSE = PingScanEvent(
source=AGENT_ID,
target=IPv4Address("10.10.10.1"),
response_received=False,
os=OperatingSystem.LINUX,
)
PING_SCAN_EVENT_NO_OS = PingScanEvent(
source=AGENT_ID,
target=IPv4Address("10.10.10.1"),
response_received=True,
os=None,
)
TCP_SCAN_EVENT = TCPScanEvent(
source=AGENT_ID,
target=IPv4Address("10.10.10.1"),
ports={22: PortStatus.OPEN, 8080: PortStatus.CLOSED},
)
TCP_SCAN_EVENT_CLOSED = TCPScanEvent(
source=AGENT_ID,
target=IPv4Address("10.10.10.1"),
ports={145: PortStatus.CLOSED, 8080: PortStatus.CLOSED},
)
@pytest.fixture
def agent_repository() -> IAgentRepository:
@ -60,6 +81,8 @@ def agent_repository() -> IAgentRepository:
@pytest.fixture
def machine_repository() -> IMachineRepository:
machine_repository = MagicMock(spec=IMachineRepository)
machine_repository.get_machine_by_id = MagicMock(side_effect=machine_from_id)
machine_repository.get_machines_by_ip = MagicMock(side_effect=machines_from_ip)
machine_repository.get_new_id = MagicMock(side_effect=count(SEED_ID))
machine_repository.upsert_machine = MagicMock()
return machine_repository
@ -73,24 +96,28 @@ def node_repository() -> INodeRepository:
@pytest.fixture
def handler(agent_repository, machine_repository, node_repository) -> handle_ping_scan_event:
return handle_ping_scan_event(agent_repository, machine_repository, node_repository)
def scan_event_handler(agent_repository, machine_repository, node_repository):
return ScanEventHandler(agent_repository, machine_repository, node_repository)
machines = {MACHINE_ID: PINGER_MACHINE, TARGET_MACHINE.id: TARGET_MACHINE}
machines_by_id = {MACHINE_ID: PINGER_MACHINE, TARGET_MACHINE.id: TARGET_MACHINE}
machines_by_ip = {
IPv4Address("10.10.10.99"): [PINGER_MACHINE],
MACHINES_BY_ID = {MACHINE_ID: SOURCE_MACHINE, TARGET_MACHINE.id: TARGET_MACHINE}
MACHINES_BY_IP = {
IPv4Address("10.10.10.99"): [SOURCE_MACHINE],
IPv4Address("10.10.10.1"): [TARGET_MACHINE],
}
@pytest.fixture(params=[SOURCE_MACHINE.id, TARGET_MACHINE.id])
def machine_id(request):
return request.param
def machine_from_id(id: int):
return machines_by_id[id]
return MACHINES_BY_ID[id]
def machines_from_ip(ip: IPv4Address):
return machines_by_ip[ip]
return MACHINES_BY_IP[ip]
class error_machine_by_id:
@ -125,51 +152,68 @@ class error_machine_by_ip:
return machines
def test_handle_ping_scan_event__target_machine_not_exists(
handler: handle_ping_scan_event,
machine_repository: IMachineRepository,
HANDLE_PING_SCAN_METHOD = "handle_ping_scan_event"
HANDLE_TCP_SCAN_METHOD = "handle_tcp_scan_event"
@pytest.fixture
def handler(scan_event_handler, request):
return getattr(scan_event_handler, request.param)
def test_ping_scan_event_target_machine_not_exists(
scan_event_handler, machine_repository: IMachineRepository
):
machine_repository.get_machine_by_id = MagicMock(side_effect=machine_from_id)
event = PING_SCAN_EVENT
machine_repository.get_machines_by_ip = MagicMock(side_effect=UnknownRecordError)
handler(EVENT)
scan_event_handler.handle_ping_scan_event(event)
expected_machine = Machine(id=SEED_ID, network_interfaces=[IPv4Interface(EVENT.target)])
expected_machine.operating_system = EVENT.os
expected_machine = Machine(id=SEED_ID, network_interfaces=[IPv4Interface(event.target)])
expected_machine.operating_system = event.os
machine_repository.upsert_machine.assert_called_with(expected_machine)
def test_handle_ping_scan_event__target_machine_already_exists(
handler: handle_ping_scan_event,
machine_repository: IMachineRepository,
def test_tcp_scan_event_target_machine_not_exists(
scan_event_handler, machine_repository: IMachineRepository
):
machine_repository.get_machine_by_id = MagicMock(side_effect=machine_from_id)
machine_repository.get_machines_by_ip = MagicMock(side_effect=machines_from_ip)
event = TCP_SCAN_EVENT
machine_repository.get_machines_by_ip = MagicMock(side_effect=UnknownRecordError)
handler(EVENT)
scan_event_handler.handle_tcp_scan_event(event)
expected_machine = TARGET_MACHINE.copy()
expected_machine.operating_system = OperatingSystem.LINUX
expected_machine = Machine(id=SEED_ID, network_interfaces=[IPv4Interface(event.target)])
machine_repository.upsert_machine.assert_called_with(expected_machine)
def test_handle_ping_scan_event__upserts_node(
handler: handle_ping_scan_event,
@pytest.mark.parametrize(
"event,handler",
[(PING_SCAN_EVENT, HANDLE_PING_SCAN_METHOD), (TCP_SCAN_EVENT, HANDLE_TCP_SCAN_METHOD)],
indirect=["handler"],
)
def test_upserts_node(
event,
handler,
machine_repository: IMachineRepository,
node_repository: INodeRepository,
):
machine_repository.get_machine_by_id = MagicMock(side_effect=machine_from_id)
machine_repository.get_machines_by_ip = MagicMock(return_value=[TARGET_MACHINE])
handler(EVENT)
handler(event)
node_repository.upsert_communication.assert_called_with(
PINGER_MACHINE.id, TARGET_MACHINE.id, CommunicationType.SCANNED
SOURCE_MACHINE.id, TARGET_MACHINE.id, CommunicationType.SCANNED
)
def test_handle_ping_scan_event__node_not_upserted_if_no_matching_agent(
handler: handle_ping_scan_event,
@pytest.mark.parametrize(
"event,handler",
[(PING_SCAN_EVENT, HANDLE_PING_SCAN_METHOD), (TCP_SCAN_EVENT, HANDLE_TCP_SCAN_METHOD)],
indirect=["handler"],
)
def test_node_not_upserted_if_no_matching_agent(
event,
handler,
agent_repository: IAgentRepository,
machine_repository: IMachineRepository,
node_repository: INodeRepository,
@ -177,92 +221,106 @@ def test_handle_ping_scan_event__node_not_upserted_if_no_matching_agent(
agent_repository.get_agent_by_id = MagicMock(side_effect=UnknownRecordError)
machine_repository.get_machine_by_id = MagicMock(return_value=TARGET_MACHINE)
handler(EVENT)
handler(event)
assert not node_repository.upsert_communication.called
def test_handle_ping_scan_event__node_not_upserted_if_no_matching_machine(
handler: handle_ping_scan_event,
@pytest.mark.parametrize(
"event,handler",
[(PING_SCAN_EVENT, HANDLE_PING_SCAN_METHOD), (TCP_SCAN_EVENT, HANDLE_TCP_SCAN_METHOD)],
indirect=["handler"],
)
def test_node_not_upserted_if_machine_retrievalerror(
event,
handler,
machine_repository: IMachineRepository,
node_repository: INodeRepository,
):
machine_repository.get_machine_by_id = MagicMock(side_effect=UnknownRecordError)
handler(EVENT)
assert not node_repository.upsert_communication.called
@pytest.mark.parametrize("id", [PINGER_MACHINE.id, TARGET_MACHINE.id])
def test_handle_scan_data__node_not_upserted_if_machine_retrievalerror(
handler: handle_ping_scan_event,
machine_repository: IMachineRepository,
node_repository: INodeRepository,
id,
machine_id,
):
machine_repository.get_machine_by_id = MagicMock(
side_effect=error_machine_by_id(id, RetrievalError)
side_effect=error_machine_by_id(machine_id, RetrievalError)
)
machine_repository.get_machines_by_ip = MagicMock(
side_effect=error_machine_by_ip(id, RetrievalError)
side_effect=error_machine_by_ip(machine_id, RetrievalError)
)
handler(EVENT)
handler(event)
assert not node_repository.upsert_communication.called
def test_handle_scan_data__machine_not_upserted_if_os_is_none(
handler: handle_ping_scan_event, machine_repository: IMachineRepository
):
event = PingScanEvent(source=EVENT.source, target=EVENT.target, response_received=True, os=None)
machine_repository.get_machine_by_id = MagicMock(side_effect=machine_from_id)
machine_repository.get_machines_by_ip = MagicMock(side_effect=machines_from_ip)
@pytest.mark.parametrize(
"event,handler",
[
(PING_SCAN_EVENT_NO_OS, HANDLE_PING_SCAN_METHOD),
(TCP_SCAN_EVENT_CLOSED, HANDLE_TCP_SCAN_METHOD),
],
indirect=["handler"],
)
def test_machine_not_upserted(event, handler, machine_repository: IMachineRepository):
handler(event)
assert not machine_repository.upsert_machine.called
def test_handle_scan_data__machine_not_upserted_if_existing_machine_has_os(
handler: handle_ping_scan_event, machine_repository: IMachineRepository
def test_machine_not_upserted_if_existing_machine_has_os(
scan_event_handler, machine_repository: IMachineRepository
):
machine_with_os = TARGET_MACHINE
machine_with_os.operating_system = OperatingSystem.WINDOWS
machine_repository.get_machine_by_ip = MagicMock(return_value=machine_with_os)
machine_repository.get_machines_by_ip = MagicMock(return_value=[machine_with_os])
handler(EVENT)
scan_event_handler.handle_ping_scan_event(PING_SCAN_EVENT)
assert not machine_repository.upsert_machine.called
def test_handle_scan_data__node_not_upserted_if_machine_storageerror(
handler: handle_ping_scan_event,
def test_node_not_upserted_by_ping_scan_event_if_machine_storageerror(
scan_event_handler,
machine_repository: IMachineRepository,
node_repository: INodeRepository,
):
target_machine = TARGET_MACHINE
target_machine.operating_system = None
machine_repository.get_machine_by_id = MagicMock(side_effect=machine_from_id)
machine_repository.get_machines_by_ip = MagicMock(side_effect=machines_from_ip)
machine_repository.upsert_machine = MagicMock(side_effect=StorageError)
handler(EVENT)
scan_event_handler.handle_ping_scan_event(PING_SCAN_EVENT)
assert not node_repository.upsert_communication.called
def test_handle_scan_data__failed_ping(
handler: handle_ping_scan_event,
def test_node_not_upserted_by_tcp_scan_event_if_machine_storageerror(
scan_event_handler,
machine_repository: IMachineRepository,
node_repository: INodeRepository,
):
machine_repository.get_machines_by_ip = MagicMock(side_effect=UnknownRecordError)
machine_repository.upsert_machine = MagicMock(side_effect=StorageError)
scan_event_handler.handle_tcp_scan_event(TCP_SCAN_EVENT)
assert not node_repository.upsert_communication.called
@pytest.mark.parametrize(
"event,handler",
[
(PING_SCAN_EVENT_NO_RESPONSE, HANDLE_PING_SCAN_METHOD),
(TCP_SCAN_EVENT_CLOSED, HANDLE_TCP_SCAN_METHOD),
],
indirect=["handler"],
)
def test_failed_scan(
event,
handler,
machine_repository: IMachineRepository,
node_repository: INodeRepository,
):
machine_repository.upsert_machine = MagicMock(side_effect=StorageError)
machine_repository.get_machine_by_id = MagicMock(side_effect=machine_from_id)
machine_repository.get_machines_by_ip = MagicMock(side_effect=machines_from_ip)
handler(EVENT_NO_RESPONSE)
handler(event)
assert not node_repository.upsert_communication.called
assert not machine_repository.upsert_machine.called