From 6cb9d4808f6cb0f706f891893defd5c70b61c3c7 Mon Sep 17 00:00:00 2001 From: Shreya Date: Wed, 18 Nov 2020 23:34:28 +0530 Subject: [PATCH] PostgreSQL communication encryption fingerprinting --- .../network/postgresql_fingerprint.py | 73 ++++++++++++------- 1 file changed, 48 insertions(+), 25 deletions(-) diff --git a/monkey/infection_monkey/network/postgresql_fingerprint.py b/monkey/infection_monkey/network/postgresql_fingerprint.py index 1305e26c1..4ab42f754 100644 --- a/monkey/infection_monkey/network/postgresql_fingerprint.py +++ b/monkey/infection_monkey/network/postgresql_fingerprint.py @@ -29,39 +29,62 @@ class PostgreSQLFinger(HostFinger): password=self.CREDS['password'], sslmode='prefer') # don't need to worry about DB name; creds are wrong, won't check - except psycopg2.OperationalError as ex: # try block will throw an OperationalError since the credentials are wrong + except psycopg2.OperationalError as ex: + # try block will throw an OperationalError since the credentials are wrong, which we then analyze + self.relevant_ex_substrings = ["password authentication failed", + "entry for host"] # "no pg_hba.conf entry for host" but filename may be diff exception_string = str(ex) - relevant_ex_substrings = ["password authentication failed", - "entry for host"] # "no pg_hba.conf entry for host" but filename may be diff - if not any(substr in exception_string for substr in relevant_ex_substrings): + if not any(substr in exception_string for substr in self.relevant_ex_substrings): # OperationalError due to some other reason return False self.init_service(host.services, self._SCANNED_SERVICE, self.POSTGRESQL_DEFAULT_PORT) - """ - ---> split exception_string by \n + ssl_connection_details = [] + exceptions = exception_string.split("\n") + ssl_conf_on_server = self.is_ssl_configured(exceptions) - if len == 1: ssl_conf_on_server = False - if "password authentication failed" is present: ssl_forced = False - elif "entry for host" is present: ssl_forced = True - if len == 2: ssl_conf_on_server = True - // for [0] - if "password authentication failed" is present: ssl_all = True - elif "entry for host" is present: ssl_forced = False - // for [1] - if "password authentication failed" is present: nossl_all = True - elif "entry for host" is present: nossl_forced = False + """ Make this part cleaner and better! """ - ---> def is_ssl_configured(): - // check length after splitting - ---> def is_ssl_exists(): - if is_ssl_configured(): // checks twice - once for SSL entry, once for no SSL entry - koi_function() for [0]th // kisi function mein if-elif waala daal do upar jo likha hai - koi_function() for [-1]th + # SSL configured + if ssl_conf_on_server: + ssl_connection_details.append("SSL is configured on the PostgreSQL server.\n") + # SSL + if self.found_entry_for_host_but_pwd_auth_failed(exceptions[0]): + ssl_connection_details.append("SSL connections can be made by all.\n") + else: + ssl_connection_details.append( + "SSL connections can be made by selected hosts only OR non-SSL usage is forced.\n") + # non-SSL + if self.found_entry_for_host_but_pwd_auth_failed(exceptions[1]): + ssl_connection_details.append("Non-SSL connections can be made by all.\n") + else: + ssl_connection_details.append( + "Non-SSL connections can be made by selected hosts only OR SSL usage is forced.\n") - // how do i make deriving the results simpler and shorter?! - """ + # SSL not configured + else: + ssl_connection_details.append("SSL is NOT configured on the PostgreSQL server.\n") + if self.found_entry_for_host_but_pwd_auth_failed(exceptions[0]): + ssl_connection_details.append("Non-SSL connections can be made by all.\n") + else: + ssl_connection_details.append( + "Non-SSL connections can be made by selected hosts only OR SSL usage is forced.\n") - # LOG.info(f'Exception: {ex}') + host.services[self._SCANNED_SERVICE]['communication_encryption_details'] = ''.join(ssl_connection_details) + + return True + + def is_ssl_configured(self, exceptions): + # when trying to authenticate, it checks pg_hba.conf file: + # first, for a record where it can connect with SSL and second, without SSL + if len(exceptions) == 1: # SSL not configured on server so only checks for non-SSL record + return False + elif len(exceptions) == 2: # SSL configured so checks for both + return True + + def found_entry_for_host_but_pwd_auth_failed(self, exception): + if self.relevant_ex_substrings[0] in exception: + return True # entry found in pg_hba.conf file but password authentication failed + return False # entry not found in pg_hba.conf file