Agent: Add debug logging to TCPRelay

This commit is contained in:
Kekoa Kaaikala 2022-09-12 20:45:37 +00:00
parent c532cdec72
commit 3dd2052dc5
4 changed files with 28 additions and 4 deletions

View File

@ -1,5 +1,6 @@
from dataclasses import dataclass from dataclasses import dataclass
from ipaddress import IPv4Address from ipaddress import IPv4Address
from logging import getLogger
from threading import Lock from threading import Lock
from typing import Dict from typing import Dict
@ -13,6 +14,9 @@ DEFAULT_NEW_CLIENT_TIMEOUT = 2.5 * MEDIUM_REQUEST_TIMEOUT
DEFAULT_DISCONNECT_TIMEOUT = 60 * 2 # Wait up to 2 minutes for clients to disconnect DEFAULT_DISCONNECT_TIMEOUT = 60 * 2 # Wait up to 2 minutes for clients to disconnect
logger = getLogger(__name__)
@dataclass @dataclass
class RelayUser: class RelayUser:
address: IPv4Address address: IPv4Address
@ -48,7 +52,9 @@ class RelayUserHandler:
timer = EggTimer() timer = EggTimer()
timer.set(self._client_disconnect_timeout) timer.set(self._client_disconnect_timeout)
self._relay_users[user_address] = RelayUser(user_address, timer) user = RelayUser(user_address, timer)
self._relay_users[user_address] = user
logger.debug(f"Added relay user {user}")
def add_potential_user(self, user_address: IPv4Address): def add_potential_user(self, user_address: IPv4Address):
""" """
@ -60,7 +66,9 @@ class RelayUserHandler:
with self._lock: with self._lock:
timer = EggTimer() timer = EggTimer()
timer.set(self._new_client_timeout) timer.set(self._new_client_timeout)
self._potential_users[user_address] = RelayUser(user_address, timer) user = RelayUser(user_address, timer)
self._potential_users[user_address] = user
logger.debug(f"Added potential relay user {user}")
def disconnect_user(self, user_address: IPv4Address): def disconnect_user(self, user_address: IPv4Address):
""" """
@ -70,6 +78,7 @@ class RelayUserHandler:
""" """
with self._lock: with self._lock:
if user_address in self._relay_users: if user_address in self._relay_users:
logger.debug(f"Disconnected user {user_address}")
del_key(self._relay_users, user_address) del_key(self._relay_users, user_address)
def has_potential_users(self) -> bool: def has_potential_users(self) -> bool:

View File

@ -39,9 +39,12 @@ class TCPConnectionHandler(Thread, InterruptableThreadMixin):
except socket.timeout: except socket.timeout:
continue continue
logging.debug(f"New connection received from: {source.getpeername()}")
for notify_client_connected in self._client_connected: for notify_client_connected in self._client_connected:
notify_client_connected(source) notify_client_connected(source)
except: except OSError:
logging.exception("Uncaught error in TCPConnectionHandler thread") logging.exception("Uncaught error in TCPConnectionHandler thread")
finally: finally:
l_socket.close() l_socket.close()
logging.info("Exiting connection handler.")

View File

@ -1,11 +1,14 @@
import socket import socket
from ipaddress import IPv4Address from ipaddress import IPv4Address
from logging import getLogger
from threading import Lock from threading import Lock
from typing import Set from typing import Set
from .consts import SOCKET_TIMEOUT from .consts import SOCKET_TIMEOUT
from .sockets_pipe import SocketsPipe from .sockets_pipe import SocketsPipe
logger = getLogger(__name__)
class TCPPipeSpawner: class TCPPipeSpawner:
""" """
@ -37,6 +40,7 @@ class TCPPipeSpawner:
pipe = SocketsPipe(source, dest, self._handle_pipe_closed) pipe = SocketsPipe(source, dest, self._handle_pipe_closed)
with self._lock: with self._lock:
self._pipes.add(pipe) self._pipes.add(pipe)
pipe.start() pipe.start()
def has_open_pipes(self) -> bool: def has_open_pipes(self) -> bool:
@ -50,4 +54,5 @@ class TCPPipeSpawner:
def _handle_pipe_closed(self, pipe: SocketsPipe): def _handle_pipe_closed(self, pipe: SocketsPipe):
with self._lock: with self._lock:
logger.debug(f"Closing pipe {pipe}")
self._pipes.discard(pipe) self._pipes.discard(pipe)

View File

@ -1,4 +1,5 @@
from ipaddress import IPv4Address from ipaddress import IPv4Address
from logging import getLogger
from threading import Lock, Thread from threading import Lock, Thread
from time import sleep from time import sleep
@ -10,6 +11,8 @@ from infection_monkey.network.relay import (
) )
from infection_monkey.utils.threading import InterruptableThreadMixin from infection_monkey.utils.threading import InterruptableThreadMixin
logger = getLogger(__name__)
class TCPRelay(Thread, InterruptableThreadMixin): class TCPRelay(Thread, InterruptableThreadMixin):
""" """
@ -23,7 +26,10 @@ class TCPRelay(Thread, InterruptableThreadMixin):
dest_port: int, dest_port: int,
client_disconnect_timeout: float, client_disconnect_timeout: float,
): ):
self._user_handler = RelayUserHandler(client_disconnect_timeout=client_disconnect_timeout) self._user_handler = RelayUserHandler(
new_client_timeout=client_disconnect_timeout,
client_disconnect_timeout=client_disconnect_timeout,
)
self._pipe_spawner = TCPPipeSpawner(dest_addr, dest_port) self._pipe_spawner = TCPPipeSpawner(dest_addr, dest_port)
relay_filter = RelayConnectionHandler(self._pipe_spawner, self._user_handler) relay_filter = RelayConnectionHandler(self._pipe_spawner, self._user_handler)
self._connection_handler = TCPConnectionHandler( self._connection_handler = TCPConnectionHandler(
@ -46,6 +52,7 @@ class TCPRelay(Thread, InterruptableThreadMixin):
self._connection_handler.stop() self._connection_handler.stop()
self._connection_handler.join() self._connection_handler.join()
self._wait_for_pipes_to_close() self._wait_for_pipes_to_close()
logger.info("TCP Relay closed.")
def add_potential_user(self, user_address: IPv4Address): def add_potential_user(self, user_address: IPv4Address):
""" """