Agent: Generate timestamp when checking for tcp ports

This commit is contained in:
Ilija Lazoroski 2022-09-30 16:48:16 +02:00
parent 96af86f766
commit 9154f6f9dc
1 changed files with 17 additions and 8 deletions

View File

@ -1,8 +1,9 @@
import logging import logging
import select import select
import socket import socket
import time from ipaddress import IPv4Address
from pprint import pformat from pprint import pformat
from time import sleep, time
from typing import Collection, Dict, Iterable, Mapping, Tuple from typing import Collection, Dict, Iterable, Mapping, Tuple
from common.agent_events import TCPScanEvent from common.agent_events import TCPScanEvent
@ -32,22 +33,29 @@ def scan_tcp_ports(
def _scan_tcp_ports( def _scan_tcp_ports(
host: str, ports_to_scan: Collection[int], timeout: float, agent_event_queue: IAgentEventQueue host: str, ports_to_scan: Collection[int], timeout: float, agent_event_queue: IAgentEventQueue
) -> Dict[int, PortScanData]: ) -> Dict[int, PortScanData]:
open_ports = _check_tcp_ports(host, ports_to_scan, timeout) event_timestamp, open_ports = _check_tcp_ports(host, ports_to_scan, timeout)
port_scan_data = _build_port_scan_data(ports_to_scan, open_ports) port_scan_data = _build_port_scan_data(ports_to_scan, open_ports)
tcp_scan_event = _generate_tcp_scan_event(host, port_scan_data) tcp_scan_event = _generate_tcp_scan_event(host, port_scan_data, event_timestamp)
agent_event_queue.publish(tcp_scan_event) agent_event_queue.publish(tcp_scan_event)
return port_scan_data return port_scan_data
def _generate_tcp_scan_event(host: str, port_scan_data: Dict[int, PortScanData]): def _generate_tcp_scan_event(
host: str, port_scan_data: Dict[int, PortScanData], event_timestamp: float
):
port_statuses = {} port_statuses = {}
for port, data in port_scan_data.items(): for port, data in port_scan_data.items():
port_statuses[port] = data.status port_statuses[port] = data.status
return TCPScanEvent(source=get_agent_id(), target=host, ports=port_statuses) return TCPScanEvent(
source=get_agent_id(),
target=IPv4Address(host),
timestamp=event_timestamp,
ports=port_statuses,
)
def _build_port_scan_data( def _build_port_scan_data(
@ -72,7 +80,7 @@ def _get_closed_port_data(port: int) -> PortScanData:
def _check_tcp_ports( def _check_tcp_ports(
ip: str, ports_to_scan: Collection[int], timeout: float = DEFAULT_TIMEOUT ip: str, ports_to_scan: Collection[int], timeout: float = DEFAULT_TIMEOUT
) -> Dict[int, str]: ) -> Tuple[float, Dict[int, str]]:
""" """
Checks whether any of the given ports are open on a target IP. Checks whether any of the given ports are open on a target IP.
:param ip: IP of host to attack :param ip: IP of host to attack
@ -89,6 +97,7 @@ def _check_tcp_ports(
connected_ports = set() connected_ports = set()
open_ports = {} open_ports = {}
event_timestamp = time()
try: try:
logger.debug( logger.debug(
"Connecting to the following ports %s" % ",".join((str(x) for x in ports_to_scan)) "Connecting to the following ports %s" % ",".join((str(x) for x in ports_to_scan))
@ -117,7 +126,7 @@ def _check_tcp_ports(
while (not timer.is_expired()) and sockets_to_try: while (not timer.is_expired()) and sockets_to_try:
# The call to select() may return sockets that are writeable but not actually # The call to select() may return sockets that are writeable but not actually
# connected. Adding this sleep prevents excessive looping. # connected. Adding this sleep prevents excessive looping.
time.sleep(min(POLL_INTERVAL, timer.time_remaining)) sleep(min(POLL_INTERVAL, timer.time_remaining))
sock_objects = [s[1] for s in sockets_to_try] sock_objects = [s[1] for s in sockets_to_try]
@ -153,7 +162,7 @@ def _check_tcp_ports(
_clean_up_sockets(possible_ports, connected_ports) _clean_up_sockets(possible_ports, connected_ports)
return open_ports return event_timestamp, open_ports
def _clean_up_sockets( def _clean_up_sockets(