From 27263cbb485000fce74169f633dda6b7e27bc198 Mon Sep 17 00:00:00 2001 From: Shreya Date: Sat, 19 Dec 2020 17:20:10 +0530 Subject: [PATCH] Readability changes (per CR) --- .../network/postgresql_fingerprint.py | 129 ++++++++++-------- 1 file changed, 71 insertions(+), 58 deletions(-) diff --git a/monkey/infection_monkey/network/postgresql_fingerprint.py b/monkey/infection_monkey/network/postgresql_fingerprint.py index c696d561f..e6b9b41ad 100644 --- a/monkey/infection_monkey/network/postgresql_fingerprint.py +++ b/monkey/infection_monkey/network/postgresql_fingerprint.py @@ -2,6 +2,7 @@ import logging import psycopg2 +from infection_monkey.model import ID_STRING from infection_monkey.network.HostFinger import HostFinger LOG = logging.getLogger(__name__) @@ -14,73 +15,41 @@ class PostgreSQLFinger(HostFinger): # Class related consts _SCANNED_SERVICE = 'PostgreSQL' POSTGRESQL_DEFAULT_PORT = 5432 - CREDS = {'username': "monkeySaysHello", - 'password': "monkeySaysXXX"} + CREDS = {'username': ID_STRING, + 'password': ID_STRING} + CONNECTION_DETAILS =\ + { + 'ssl_conf': "SSL is configured on the PostgreSQL server.\n", + 'ssl_not_conf': "SSL is NOT configured on the PostgreSQL server.\n", + 'all_ssl': "SSL connections can be made by all.\n", + 'all_non_ssl': "Non-SSL connections can be made by all.\n", + 'selected_ssl': "SSL connections can be made by selected hosts only OR " + "non-SSL usage is forced.\n", + 'selected_non_ssl': "Non-SSL connections can be made by selected hosts only OR " + "SSL usage is forced.\n", + 'only_selected': "Only selected hosts can make connections (SSL or non-SSL).\n" + } + RELEVANT_EX_SUBSTRINGS = ["password authentication failed", + "entry for host"] # "no pg_hba.conf entry for host" but filename may be diff def get_host_fingerprint(self, host): try: - connection = psycopg2.connect(host=host.ip_addr, - port=self.POSTGRESQL_DEFAULT_PORT, - user=self.CREDS['username'], - password=self.CREDS['password'], - sslmode='prefer') # don't need to worry about DB name; creds are wrong, won't check + psycopg2.connect(host=host.ip_addr, + port=self.POSTGRESQL_DEFAULT_PORT, + user=self.CREDS['username'], + password=self.CREDS['password'], + sslmode='prefer') # don't need to worry about DB name; creds are wrong, won't check except psycopg2.OperationalError as ex: # try block will throw an OperationalError since the credentials are wrong, which we then analyze try: - self.relevant_ex_substrings = ["password authentication failed", - "entry for host"] # "no pg_hba.conf entry for host" but filename may be diff exception_string = str(ex) - if not any(substr in exception_string for substr in self.relevant_ex_substrings): - # OperationalError due to some other reason + if not self.is_relevant_exception(exception_string): return False - # all's well; start analysing errors - self.init_service(host.services, self._SCANNED_SERVICE, self.POSTGRESQL_DEFAULT_PORT) - - exceptions = exception_string.split("\n") - connection_details = {'ssl_conf': "SSL is configured on the PostgreSQL server.\n", - 'ssl_not_conf': "SSL is NOT configured on the PostgreSQL server.\n", - 'all_ssl': "SSL connections can be made by all.\n", - 'all_non_ssl': "Non-SSL connections can be made by all.\n", - 'selected_ssl': "SSL connections can be made by selected hosts only OR " - "non-SSL usage is forced.\n", - 'selected_non_ssl': "Non-SSL connections can be made by selected hosts only OR " - "SSL usage is forced.\n", - 'only_selected': "Only selected hosts can make connections (SSL or non-SSL).\n"} - - ssl_connection_details = [] - ssl_conf_on_server = self.is_ssl_configured(exceptions) - - # SSL configured - if ssl_conf_on_server: - ssl_connection_details.append(connection_details['ssl_conf']) - # SSL - ssl_selected_comms_only = False - if self.found_entry_for_host_but_pwd_auth_failed(exceptions[0]): - ssl_connection_details.append(connection_details['all_ssl']) - else: - ssl_connection_details.append(connection_details['selected_ssl']) - ssl_selected_comms_only = True - # non-SSL - if self.found_entry_for_host_but_pwd_auth_failed(exceptions[1]): - ssl_connection_details.append(connection_details['all_non_ssl']) - else: - if ssl_selected_comms_only: # if only selected SSL allowed and only selected non-SSL allowed - ssl_connection_details[-1] = connection_details['only_selected'] - else: - ssl_connection_details.append(connection_details['selected_non_ssl']) - - # SSL not configured - else: - ssl_connection_details.append(connection_details['ssl_not_conf']) - if self.found_entry_for_host_but_pwd_auth_failed(exceptions[0]): - ssl_connection_details.append(connection_details['all_non_ssl']) - else: - ssl_connection_details.append(connection_details['selected_non_ssl']) - - host.services[self._SCANNED_SERVICE]['communication_encryption_details'] = ''.join(ssl_connection_details) + # all's well; start analyzing errors + self.analyze_operational_error(host, exception_string) return True except Exception as err: @@ -88,7 +57,51 @@ class PostgreSQLFinger(HostFinger): return False - def is_ssl_configured(self, exceptions): + def is_relevant_exception(self, exception_string): + if not any(substr in exception_string for substr in self.RELEVANT_EX_SUBSTRINGS): + # OperationalError due to some other reason - irrelevant exception + return False + return True + + def analyze_operational_error(self, host, exception_string): + self.init_service(host.services, self._SCANNED_SERVICE, self.POSTGRESQL_DEFAULT_PORT) + + exceptions = exception_string.split("\n") + + ssl_connection_details = [] + ssl_conf_on_server = self.is_ssl_configured(exceptions) + + # SSL configured + if ssl_conf_on_server: + ssl_connection_details.append(self.CONNECTION_DETAILS['ssl_conf']) + # SSL + ssl_selected_comms_only = False + if self.found_entry_for_host_but_pwd_auth_failed(exceptions[0]): + ssl_connection_details.append(self.CONNECTION_DETAILS['all_ssl']) + else: + ssl_connection_details.append(self.CONNECTION_DETAILS['selected_ssl']) + ssl_selected_comms_only = True + # non-SSL + if self.found_entry_for_host_but_pwd_auth_failed(exceptions[1]): + ssl_connection_details.append(self.CONNECTION_DETAILS['all_non_ssl']) + else: + if ssl_selected_comms_only: # if only selected SSL allowed and only selected non-SSL allowed + ssl_connection_details[-1] = self.CONNECTION_DETAILS['only_selected'] + else: + ssl_connection_details.append(self.CONNECTION_DETAILS['selected_non_ssl']) + + # SSL not configured + else: + ssl_connection_details.append(self.CONNECTION_DETAILS['ssl_not_conf']) + if self.found_entry_for_host_but_pwd_auth_failed(exceptions[0]): + ssl_connection_details.append(self.CONNECTION_DETAILS['all_non_ssl']) + else: + ssl_connection_details.append(self.CONNECTION_DETAILS['selected_non_ssl']) + + host.services[self._SCANNED_SERVICE]['communication_encryption_details'] = ''.join(ssl_connection_details) + + @staticmethod + def is_ssl_configured(exceptions): # when trying to authenticate, it checks pg_hba.conf file: # first, for a record where it can connect with SSL and second, without SSL if len(exceptions) == 1: # SSL not configured on server so only checks for non-SSL record @@ -97,6 +110,6 @@ class PostgreSQLFinger(HostFinger): return True def found_entry_for_host_but_pwd_auth_failed(self, exception): - if self.relevant_ex_substrings[0] in exception: + if self.RELEVANT_EX_SUBSTRINGS[0] in exception: return True # entry found in pg_hba.conf file but password authentication failed return False # entry not found in pg_hba.conf file