Agent: Add timeouts and improve firewall rule handling code

This commit is contained in:
vakarisz 2022-04-05 14:52:35 +03:00
parent 4497ea0003
commit e40703dcac
1 changed files with 25 additions and 13 deletions

View File

@ -1,18 +1,23 @@
import logging
import platform import platform
import subprocess import subprocess
import sys import sys
from common.common_consts.timeouts import SHORT_REQUEST_TIMEOUT
logger = logging.getLogger(__name__)
def _run_netsh_cmd(command, args): def _run_netsh_cmd(command, args):
cmd = subprocess.Popen( output = subprocess.check_output(
"netsh %s %s" "netsh %s %s"
% ( % (
command, command,
" ".join(['%s="%s"' % (key, value) for key, value in list(args.items()) if value]), " ".join(['%s="%s"' % (key, value) for key, value in list(args.items()) if value]),
), ),
stdout=subprocess.PIPE, timeout=SHORT_REQUEST_TIMEOUT,
) )
return cmd.stdout.read().strip().lower().endswith("ok.") return output.strip().lower().endswith("ok.")
class FirewallApp(object): class FirewallApp(object):
@ -44,15 +49,19 @@ class WinAdvFirewall(FirewallApp):
def is_enabled(self): def is_enabled(self):
try: try:
cmd = subprocess.Popen("netsh advfirewall show currentprofile", stdout=subprocess.PIPE) out = subprocess.check_output(
out = cmd.stdout.readlines() "netsh advfirewall show currentprofile", timeout=SHORT_REQUEST_TIMEOUT
)
except subprocess.TimeoutExpired:
return None
except Exception:
return None
for line in out: for line in out.decode().splitlines():
if line.startswith("State"): if line.startswith("State"):
state = line.split()[-1].strip() state = line.split()[-1].strip()
return state == "ON" return state == "ON"
except Exception:
return None return None
def add_firewall_rule( def add_firewall_rule(
@ -66,7 +75,10 @@ class WinAdvFirewall(FirewallApp):
return True return True
else: else:
return False return False
except Exception: except subprocess.CalledProcessError as err:
logger.info(f"Failed adding a firewall rule: {err.stdout}")
except subprocess.TimeoutExpired:
logger.info("Timeout expired trying to add a firewall rule.")
return None return None
def remove_firewall_rule(self, name="Firewall", **kwargs): def remove_firewall_rule(self, name="Firewall", **kwargs):