diff --git a/monkey/infection_monkey/network/test_postgresql_finger.py b/monkey/infection_monkey/network/test_postgresql_finger.py index 632541257..f0bf5998f 100644 --- a/monkey/infection_monkey/network/test_postgresql_finger.py +++ b/monkey/infection_monkey/network/test_postgresql_finger.py @@ -1,11 +1,11 @@ -from unittest import TestCase -from unittest.mock import Mock +import pytest +import infection_monkey.network.postgresql_finger from infection_monkey.network.postgresql_finger import PostgreSQLFinger IRRELEVANT_EXCEPTION_STRING = "This is an irrelevant exception string." -RELEVANT_EXCEPTION_STRINGS =\ +_RELEVANT_EXCEPTION_STRING_PARTS =\ { 'pwd_auth_failed': 'FATAL: password authentication failed for user "root"', 'ssl_on_entry_not_found': 'FATAL: no pg_hba.conf entry for host "127.0.0.1",' @@ -14,7 +14,21 @@ RELEVANT_EXCEPTION_STRINGS =\ 'user "random", database "postgres", SSL off' } -RESULT_STRINGS =\ +_RELEVANT_EXCEPTION_STRINGS =\ + { + 'pwd_auth_failed': _RELEVANT_EXCEPTION_STRING_PARTS['pwd_auth_failed'], + 'ssl_off_entry_not_found': _RELEVANT_EXCEPTION_STRING_PARTS['ssl_off_entry_not_found'], + 'pwd_auth_failed_pwd_auth_failed': '\n'.join([_RELEVANT_EXCEPTION_STRING_PARTS['pwd_auth_failed'], + _RELEVANT_EXCEPTION_STRING_PARTS['pwd_auth_failed']]), + 'pwd_auth_failed_ssl_off_entry_not_found': '\n'.join([_RELEVANT_EXCEPTION_STRING_PARTS['pwd_auth_failed'], + _RELEVANT_EXCEPTION_STRING_PARTS['ssl_off_entry_not_found']]), + 'ssl_on_entry_not_found_pwd_auth_failed': '\n'.join([_RELEVANT_EXCEPTION_STRING_PARTS['ssl_on_entry_not_found'], + _RELEVANT_EXCEPTION_STRING_PARTS['pwd_auth_failed']]), + 'ssl_on_entry_not_found_ssl_off_entry_not_found': '\n'.join([_RELEVANT_EXCEPTION_STRING_PARTS['ssl_on_entry_not_found'], + _RELEVANT_EXCEPTION_STRING_PARTS['ssl_off_entry_not_found']]) + } + +_RESULT_STRINGS =\ { 'ssl_conf': "SSL is configured on the PostgreSQL server.\n", 'ssl_not_conf': "SSL is NOT configured on the PostgreSQL server.\n", @@ -27,64 +41,111 @@ RESULT_STRINGS =\ 'only_selected': "Only selected hosts can make connections (SSL or non-SSL).\n" } -EXAMPLE_EXCEPTIONS_WITH_EXPECTED_RESULTS =\ +RELEVANT_EXCEPTIONS_WITH_EXPECTED_RESULTS =\ { - # SSL not configured, all non-SSL allowed, # SSL not configured, all non-SSL allowed - RELEVANT_EXCEPTION_STRINGS['pwd_auth_failed']: [ - RESULT_STRINGS['ssl_not_conf'], - RESULT_STRINGS['all_non_ssl'] + # SSL not configured, all non-SSL allowed + _RELEVANT_EXCEPTION_STRINGS['pwd_auth_failed']: [ + _RESULT_STRINGS['ssl_not_conf'], + _RESULT_STRINGS['all_non_ssl'] ], # SSL not configured, selected non-SSL allowed - RELEVANT_EXCEPTION_STRINGS['ssl_off_entry_not_found']: [ - RESULT_STRINGS['ssl_not_conf'], - RESULT_STRINGS['selected_non_ssl'] + _RELEVANT_EXCEPTION_STRINGS['ssl_off_entry_not_found']: [ + _RESULT_STRINGS['ssl_not_conf'], + _RESULT_STRINGS['selected_non_ssl'] ], # all SSL allowed, all non-SSL allowed - '\n'.join([RELEVANT_EXCEPTION_STRINGS['pwd_auth_failed'], - RELEVANT_EXCEPTION_STRINGS['pwd_auth_failed']]): [ - RESULT_STRINGS['ssl_conf'], - RESULT_STRINGS['all_ssl'], - RESULT_STRINGS['all_non_ssl'] + _RELEVANT_EXCEPTION_STRINGS['pwd_auth_failed_pwd_auth_failed']: [ + _RESULT_STRINGS['ssl_conf'], + _RESULT_STRINGS['all_ssl'], + _RESULT_STRINGS['all_non_ssl'] ], # all SSL allowed, selected non-SSL allowed - '\n'.join([RELEVANT_EXCEPTION_STRINGS['pwd_auth_failed'], - RELEVANT_EXCEPTION_STRINGS['ssl_off_entry_not_found']]): [ - RESULT_STRINGS['ssl_conf'], - RESULT_STRINGS['all_ssl'], - RESULT_STRINGS['selected_non_ssl'] + _RELEVANT_EXCEPTION_STRINGS['pwd_auth_failed_ssl_off_entry_not_found']: [ + _RESULT_STRINGS['ssl_conf'], + _RESULT_STRINGS['all_ssl'], + _RESULT_STRINGS['selected_non_ssl'] ], # selected SSL allowed, all non-SSL allowed - '\n'.join([RELEVANT_EXCEPTION_STRINGS['ssl_on_entry_not_found'], - RELEVANT_EXCEPTION_STRINGS['pwd_auth_failed']]): [ - RESULT_STRINGS['ssl_conf'], - RESULT_STRINGS['selected_ssl'], - RESULT_STRINGS['all_non_ssl'] + _RELEVANT_EXCEPTION_STRINGS['ssl_on_entry_not_found_pwd_auth_failed']: [ + _RESULT_STRINGS['ssl_conf'], + _RESULT_STRINGS['selected_ssl'], + _RESULT_STRINGS['all_non_ssl'] ], # selected SSL allowed, selected non-SSL allowed - '\n'.join([RELEVANT_EXCEPTION_STRINGS['ssl_on_entry_not_found'], - RELEVANT_EXCEPTION_STRINGS['ssl_off_entry_not_found']]): [ - RESULT_STRINGS['ssl_conf'], - RESULT_STRINGS['only_selected'] + _RELEVANT_EXCEPTION_STRINGS['ssl_on_entry_not_found_ssl_off_entry_not_found']: [ + _RESULT_STRINGS['ssl_conf'], + _RESULT_STRINGS['only_selected'] ] } -class TestPostgreSQLFinger(TestCase): - def test_is_relevant_exception(self): - assert PostgreSQLFinger()._is_relevant_exception(IRRELEVANT_EXCEPTION_STRING) is False - for exception_string in EXAMPLE_EXCEPTIONS_WITH_EXPECTED_RESULTS: - assert PostgreSQLFinger()._is_relevant_exception(exception_string) is True +@pytest.fixture +def mock_PostgreSQLFinger(): + return PostgreSQLFinger() - def test_analyze_operational_error(self): - host = Mock(['services']) - host.services = {} - for exception_string in EXAMPLE_EXCEPTIONS_WITH_EXPECTED_RESULTS: - with self.subTest(msg=f"Checking result for exception: {exception_string}"): - PostgreSQLFinger().analyze_operational_error(host, exception_string) - assert host.services['PostgreSQL']['communication_encryption_details'] ==\ - ''.join(EXAMPLE_EXCEPTIONS_WITH_EXPECTED_RESULTS[exception_string]) + +class DummyHost: + def __init__(self): + self.services = {} + + +@pytest.fixture +def host(): + return DummyHost() + + +def test_irrelevant_exception(mock_PostgreSQLFinger): + assert mock_PostgreSQLFinger._is_relevant_exception(IRRELEVANT_EXCEPTION_STRING) is False + + +def test_exception_ssl_not_configured_all_non_ssl_allowed(mock_PostgreSQLFinger, host): + exception = _RELEVANT_EXCEPTION_STRINGS['pwd_auth_failed'] + assert mock_PostgreSQLFinger._is_relevant_exception(exception) is True + + result = mock_PostgreSQLFinger.analyze_operational_error(host, exception) + assert host.services[mock_PostgreSQLFinger._SCANNED_SERVICE]['communication_encryption_details'] == ''.join(RELEVANT_EXCEPTIONS_WITH_EXPECTED_RESULTS[exception]) + + +def test_exception_ssl_not_configured_selected_non_ssl_allowed(mock_PostgreSQLFinger, host): + exception = _RELEVANT_EXCEPTION_STRINGS['ssl_off_entry_not_found'] + assert mock_PostgreSQLFinger._is_relevant_exception(exception) is True + + result = mock_PostgreSQLFinger.analyze_operational_error(host, exception) + assert host.services[mock_PostgreSQLFinger._SCANNED_SERVICE]['communication_encryption_details'] == ''.join(RELEVANT_EXCEPTIONS_WITH_EXPECTED_RESULTS[exception]) + + +def test_exception_all_ssl_allowed_all_non_ssl_allowed(mock_PostgreSQLFinger, host): + exception = _RELEVANT_EXCEPTION_STRINGS['pwd_auth_failed_pwd_auth_failed'] + assert mock_PostgreSQLFinger._is_relevant_exception(exception) is True + + result = mock_PostgreSQLFinger.analyze_operational_error(host, exception) + assert host.services[mock_PostgreSQLFinger._SCANNED_SERVICE]['communication_encryption_details'] == ''.join(RELEVANT_EXCEPTIONS_WITH_EXPECTED_RESULTS[exception]) + + +def test_exception_all_ssl_allowed_selected_non_ssl_allowed(mock_PostgreSQLFinger, host): + exception = _RELEVANT_EXCEPTION_STRINGS['pwd_auth_failed_ssl_off_entry_not_found'] + assert mock_PostgreSQLFinger._is_relevant_exception(exception) is True + + result = mock_PostgreSQLFinger.analyze_operational_error(host, exception) + assert host.services[mock_PostgreSQLFinger._SCANNED_SERVICE]['communication_encryption_details'] == ''.join(RELEVANT_EXCEPTIONS_WITH_EXPECTED_RESULTS[exception]) + + +def test_exception_selected_ssl_allowed_all_non_ssl_allowed(mock_PostgreSQLFinger, host): + exception = _RELEVANT_EXCEPTION_STRINGS['ssl_on_entry_not_found_pwd_auth_failed'] + assert mock_PostgreSQLFinger._is_relevant_exception(exception) is True + + result = mock_PostgreSQLFinger.analyze_operational_error(host, exception) + assert host.services[mock_PostgreSQLFinger._SCANNED_SERVICE]['communication_encryption_details'] == ''.join(RELEVANT_EXCEPTIONS_WITH_EXPECTED_RESULTS[exception]) + + +def test_exception_selected_ssl_allowed_selected_non_ssl_allowed(mock_PostgreSQLFinger, host): + exception = _RELEVANT_EXCEPTION_STRINGS['ssl_on_entry_not_found_ssl_off_entry_not_found'] + assert mock_PostgreSQLFinger._is_relevant_exception(exception) is True + + result = mock_PostgreSQLFinger.analyze_operational_error(host, exception) + assert host.services[mock_PostgreSQLFinger._SCANNED_SERVICE]['communication_encryption_details'] == ''.join(RELEVANT_EXCEPTIONS_WITH_EXPECTED_RESULTS[exception])