forked from p34709852/monkey
PostgreSQL communication encryption fingerprinting
This commit is contained in:
parent
edc1b779d3
commit
6cb9d4808f
|
@ -29,39 +29,62 @@ class PostgreSQLFinger(HostFinger):
|
||||||
password=self.CREDS['password'],
|
password=self.CREDS['password'],
|
||||||
sslmode='prefer') # don't need to worry about DB name; creds are wrong, won't check
|
sslmode='prefer') # don't need to worry about DB name; creds are wrong, won't check
|
||||||
|
|
||||||
except psycopg2.OperationalError as ex: # try block will throw an OperationalError since the credentials are wrong
|
except psycopg2.OperationalError as ex:
|
||||||
exception_string = str(ex)
|
# try block will throw an OperationalError since the credentials are wrong, which we then analyze
|
||||||
relevant_ex_substrings = ["password authentication failed",
|
self.relevant_ex_substrings = ["password authentication failed",
|
||||||
"entry for host"] # "no pg_hba.conf entry for host" but filename may be diff
|
"entry for host"] # "no pg_hba.conf entry for host" but filename may be diff
|
||||||
|
exception_string = str(ex)
|
||||||
|
|
||||||
if not any(substr in exception_string for substr in relevant_ex_substrings):
|
if not any(substr in exception_string for substr in self.relevant_ex_substrings):
|
||||||
# OperationalError due to some other reason
|
# OperationalError due to some other reason
|
||||||
return False
|
return False
|
||||||
|
|
||||||
self.init_service(host.services, self._SCANNED_SERVICE, self.POSTGRESQL_DEFAULT_PORT)
|
self.init_service(host.services, self._SCANNED_SERVICE, self.POSTGRESQL_DEFAULT_PORT)
|
||||||
|
|
||||||
"""
|
ssl_connection_details = []
|
||||||
---> split exception_string by \n
|
exceptions = exception_string.split("\n")
|
||||||
|
ssl_conf_on_server = self.is_ssl_configured(exceptions)
|
||||||
|
|
||||||
if len == 1: ssl_conf_on_server = False
|
""" Make this part cleaner and better! """
|
||||||
if "password authentication failed" is present: ssl_forced = False
|
|
||||||
elif "entry for host" is present: ssl_forced = True
|
|
||||||
if len == 2: ssl_conf_on_server = True
|
|
||||||
// for [0]
|
|
||||||
if "password authentication failed" is present: ssl_all = True
|
|
||||||
elif "entry for host" is present: ssl_forced = False
|
|
||||||
// for [1]
|
|
||||||
if "password authentication failed" is present: nossl_all = True
|
|
||||||
elif "entry for host" is present: nossl_forced = False
|
|
||||||
|
|
||||||
---> def is_ssl_configured():
|
# SSL configured
|
||||||
// check length after splitting
|
if ssl_conf_on_server:
|
||||||
---> def is_ssl_exists():
|
ssl_connection_details.append("SSL is configured on the PostgreSQL server.\n")
|
||||||
if is_ssl_configured(): // checks twice - once for SSL entry, once for no SSL entry
|
# SSL
|
||||||
koi_function() for [0]th // kisi function mein if-elif waala daal do upar jo likha hai
|
if self.found_entry_for_host_but_pwd_auth_failed(exceptions[0]):
|
||||||
koi_function() for [-1]th
|
ssl_connection_details.append("SSL connections can be made by all.\n")
|
||||||
|
else:
|
||||||
|
ssl_connection_details.append(
|
||||||
|
"SSL connections can be made by selected hosts only OR non-SSL usage is forced.\n")
|
||||||
|
# non-SSL
|
||||||
|
if self.found_entry_for_host_but_pwd_auth_failed(exceptions[1]):
|
||||||
|
ssl_connection_details.append("Non-SSL connections can be made by all.\n")
|
||||||
|
else:
|
||||||
|
ssl_connection_details.append(
|
||||||
|
"Non-SSL connections can be made by selected hosts only OR SSL usage is forced.\n")
|
||||||
|
|
||||||
// how do i make deriving the results simpler and shorter?!
|
# SSL not configured
|
||||||
"""
|
else:
|
||||||
|
ssl_connection_details.append("SSL is NOT configured on the PostgreSQL server.\n")
|
||||||
|
if self.found_entry_for_host_but_pwd_auth_failed(exceptions[0]):
|
||||||
|
ssl_connection_details.append("Non-SSL connections can be made by all.\n")
|
||||||
|
else:
|
||||||
|
ssl_connection_details.append(
|
||||||
|
"Non-SSL connections can be made by selected hosts only OR SSL usage is forced.\n")
|
||||||
|
|
||||||
# LOG.info(f'Exception: {ex}')
|
host.services[self._SCANNED_SERVICE]['communication_encryption_details'] = ''.join(ssl_connection_details)
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
def is_ssl_configured(self, exceptions):
|
||||||
|
# when trying to authenticate, it checks pg_hba.conf file:
|
||||||
|
# first, for a record where it can connect with SSL and second, without SSL
|
||||||
|
if len(exceptions) == 1: # SSL not configured on server so only checks for non-SSL record
|
||||||
|
return False
|
||||||
|
elif len(exceptions) == 2: # SSL configured so checks for both
|
||||||
|
return True
|
||||||
|
|
||||||
|
def found_entry_for_host_but_pwd_auth_failed(self, exception):
|
||||||
|
if self.relevant_ex_substrings[0] in exception:
|
||||||
|
return True # entry found in pg_hba.conf file but password authentication failed
|
||||||
|
return False # entry not found in pg_hba.conf file
|
||||||
|
|
Loading…
Reference in New Issue