Agent: Wait for relay users to disconnect

This commit is contained in:
Kekoa Kaaikala 2022-08-31 14:38:35 +00:00 committed by Mike Salvatore
parent 4bff110f35
commit 90dcb0a91e
3 changed files with 34 additions and 2 deletions

View File

@ -10,6 +10,7 @@ from common.utils.code_utils import del_key
# Wait for potential new clients to connect # Wait for potential new clients to connect
DEFAULT_NEW_CLIENT_TIMEOUT = 2.5 * MEDIUM_REQUEST_TIMEOUT DEFAULT_NEW_CLIENT_TIMEOUT = 2.5 * MEDIUM_REQUEST_TIMEOUT
DEFAULT_DISCONNECT_TIMEOUT = 60 * 10 # Wait up to 10 minutes for clients to disconnect
@dataclass @dataclass
@ -21,8 +22,13 @@ class RelayUser:
class RelayUserHandler: class RelayUserHandler:
"""Manages membership to a network relay.""" """Manages membership to a network relay."""
def __init__(self, new_client_timeout: float = DEFAULT_NEW_CLIENT_TIMEOUT): def __init__(
self,
new_client_timeout: float = DEFAULT_NEW_CLIENT_TIMEOUT,
client_disconnect_timeout: float = DEFAULT_DISCONNECT_TIMEOUT,
):
self._new_client_timeout = new_client_timeout self._new_client_timeout = new_client_timeout
self._client_disconnect_timeout = client_disconnect_timeout
self._relay_users: Dict[IPv4Address, RelayUser] = {} self._relay_users: Dict[IPv4Address, RelayUser] = {}
self._potential_users: Dict[IPv4Address, RelayUser] = {} self._potential_users: Dict[IPv4Address, RelayUser] = {}
@ -41,6 +47,7 @@ class RelayUserHandler:
del_key(self._potential_users, user_address) del_key(self._potential_users, user_address)
timer = EggTimer() timer = EggTimer()
timer.set(self._client_disconnect_timeout)
self._relay_users[user_address] = RelayUser(user_address, timer) self._relay_users[user_address] = RelayUser(user_address, timer)
def add_potential_user(self, user_address: IPv4Address): def add_potential_user(self, user_address: IPv4Address):
@ -74,3 +81,13 @@ class RelayUserHandler:
) )
return len(self._potential_users) > 0 return len(self._potential_users) > 0
def has_connected_users(self) -> bool:
"""
Return whether or not we have any relay users.
"""
self._relay_users = dict(
filter(lambda ru: not ru[1].timer.is_expired(), self._relay_users.items())
)
return len(self._relay_users) > 0

View File

@ -36,7 +36,7 @@ class TCPRelay(Thread, InterruptableThreadMixin):
""" """
Blocks until the users disconnect or the timeout has elapsed. Blocks until the users disconnect or the timeout has elapsed.
""" """
while self._user_handler.has_potential_users(): while self._user_handler.has_potential_users() or self._user_handler.has_connected_users():
sleep(0.5) sleep(0.5)
def _wait_for_pipes_to_close(self): def _wait_for_pipes_to_close(self):

View File

@ -33,3 +33,18 @@ def test_potential_users_time_out():
sleep(0.003) sleep(0.003)
assert not handler.has_potential_users() assert not handler.has_potential_users()
def test_relay_users_added(handler):
assert not handler.has_connected_users()
handler.add_relay_user(USER_ADDRESS)
assert handler.has_connected_users()
def test_relay_users_time_out():
handler = RelayUserHandler(client_disconnect_timeout=0.001)
handler.add_relay_user(USER_ADDRESS)
sleep(0.003)
assert not handler.has_connected_users()