From 64264c9a19b7dae6cd1d5e112177cdbcafafc93c Mon Sep 17 00:00:00 2001 From: Mariusz Felisiak Date: Mon, 10 Apr 2017 19:04:00 +0200 Subject: [PATCH] Fixed #25406 -- Removed exception hiding in PostgreSQL test database creation during --keepdb. Thanks Markus Holtermann and Tim Graham for reviews. --- django/db/backends/base/creation.py | 22 ++++++++-------- django/db/backends/postgresql/creation.py | 31 ++++++++++++++++------- tests/backends/test_creation.py | 29 +++++++++++++++++++++ 3 files changed, 61 insertions(+), 21 deletions(-) diff --git a/django/db/backends/base/creation.py b/django/db/backends/base/creation.py index 36535ae70e6..7158ff62576 100644 --- a/django/db/backends/base/creation.py +++ b/django/db/backends/base/creation.py @@ -148,21 +148,22 @@ class BaseDatabaseCreation: return self.connection.settings_dict['TEST']['NAME'] return TEST_DATABASE_PREFIX + self.connection.settings_dict['NAME'] + def _execute_create_test_db(self, cursor, parameters, keepdb=False): + cursor.execute('CREATE DATABASE %(dbname)s %(suffix)s' % parameters) + def _create_test_db(self, verbosity, autoclobber, keepdb=False): """ Internal implementation - create the test db tables. """ - suffix = self.sql_table_creation_suffix() - test_database_name = self._get_test_db_name() - - qn = self.connection.ops.quote_name - + test_db_params = { + 'dbname': self.connection.ops.quote_name(test_database_name), + 'suffix': self.sql_table_creation_suffix(), + } # Create the test database and connect to it. with self._nodb_connection.cursor() as cursor: try: - cursor.execute( - "CREATE DATABASE %s %s" % (qn(test_database_name), suffix)) + self._execute_create_test_db(cursor, test_db_params, keepdb) except Exception as e: # if we want to keep the db, then no need to do any of the below, # just return and skip it all. @@ -181,11 +182,8 @@ class BaseDatabaseCreation: print("Destroying old test database for alias %s..." % ( self._get_database_display_str(verbosity, test_database_name), )) - cursor.execute( - "DROP DATABASE %s" % qn(test_database_name)) - cursor.execute( - "CREATE DATABASE %s %s" % (qn(test_database_name), - suffix)) + cursor.execute('DROP DATABASE %(dbname)s' % test_db_params) + self._execute_create_test_db(cursor, test_db_params, keepdb) except Exception as e: sys.stderr.write( "Got an error recreating the test database: %s\n" % e) diff --git a/django/db/backends/postgresql/creation.py b/django/db/backends/postgresql/creation.py index c47bb534f2d..27963e94626 100644 --- a/django/db/backends/postgresql/creation.py +++ b/django/db/backends/postgresql/creation.py @@ -28,6 +28,20 @@ class DatabaseCreation(BaseDatabaseCreation): template=test_settings.get('TEMPLATE'), ) + def _execute_create_test_db(self, cursor, parameters, keepdb=False): + try: + super()._execute_create_test_db(cursor, parameters, keepdb) + except Exception as e: + exc_msg = 'database %s already exists' % parameters['dbname'] + if exc_msg not in str(e): + # All errors except "database already exists" cancel tests + sys.stderr.write('Got an error creating the test database: %s\n' % e) + sys.exit(2) + elif not keepdb: + # If the database should be kept, ignore "database already + # exists". + raise e + def _clone_test_db(self, number, verbosity, keepdb=False): # CREATE DATABASE ... WITH TEMPLATE ... requires closing connections # to the template database. @@ -35,22 +49,21 @@ class DatabaseCreation(BaseDatabaseCreation): source_database_name = self.connection.settings_dict['NAME'] target_database_name = self.get_test_db_clone_settings(number)['NAME'] - suffix = self._get_database_create_suffix(template=source_database_name) - creation_sql = "CREATE DATABASE {} {}".format(self._quote_name(target_database_name), suffix) - + test_db_params = { + 'dbname': self._quote_name(target_database_name), + 'suffix': self._get_database_create_suffix(template=source_database_name), + } with self._nodb_connection.cursor() as cursor: try: - cursor.execute(creation_sql) - except Exception: - if keepdb: - return + self._execute_create_test_db(cursor, test_db_params, keepdb) + except Exception as e: try: if verbosity >= 1: print("Destroying old test database for alias %s..." % ( self._get_database_display_str(verbosity, target_database_name), )) - cursor.execute("DROP DATABASE %s" % self._quote_name(target_database_name)) - cursor.execute(creation_sql) + cursor.execute('DROP DATABASE %(dbname)s' % test_db_params) + self._execute_create_test_db(cursor, test_db_params, keepdb) except Exception as e: sys.stderr.write("Got an error cloning the test database: %s\n" % e) sys.exit(2) diff --git a/tests/backends/test_creation.py b/tests/backends/test_creation.py index 9c8c32e043a..f96102117b5 100644 --- a/tests/backends/test_creation.py +++ b/tests/backends/test_creation.py @@ -95,6 +95,35 @@ class PostgreSQLDatabaseCreationTests(SimpleTestCase): settings = dict(CHARSET='UTF8', TEMPLATE='template0') self.check_sql_table_creation_suffix(settings, '''WITH ENCODING 'UTF8' TEMPLATE "template0"''') + def _execute_raise_database_already_exists(self, cursor, parameters, keepdb=False): + raise DatabaseError('database %s already exists' % parameters['dbname']) + + def _execute_raise_permission_denied(self, cursor, parameters, keepdb=False): + raise DatabaseError('permission denied to create database') + + def patch_test_db_creation(self, execute_create_test_db): + return mock.patch.object(BaseDatabaseCreation, '_execute_create_test_db', execute_create_test_db) + + @mock.patch('sys.stdout', new_callable=StringIO) + @mock.patch('sys.stderr', new_callable=StringIO) + def test_create_test_db(self, *mocked_objects): + creation = PostgreSQLDatabaseCreation(connection) + # Simulate test database creation raising "database already exists" + with self.patch_test_db_creation(self._execute_raise_database_already_exists): + with mock.patch('builtins.input', return_value='no'): + with self.assertRaises(SystemExit): + # SystemExit is raised if the user answers "no" to the + # prompt asking if it's okay to delete the test database. + creation._create_test_db(verbosity=0, autoclobber=False, keepdb=False) + # "Database already exists" error is ignored when keepdb is on + creation._create_test_db(verbosity=0, autoclobber=False, keepdb=True) + # Simulate test database creation raising unexpected error + with self.patch_test_db_creation(self._execute_raise_permission_denied): + with self.assertRaises(SystemExit): + creation._create_test_db(verbosity=0, autoclobber=False, keepdb=False) + with self.assertRaises(SystemExit): + creation._create_test_db(verbosity=0, autoclobber=False, keepdb=True) + @unittest.skipUnless(connection.vendor == 'oracle', "Oracle specific tests") @mock.patch.object(OracleDatabaseCreation, '_maindb_connection', return_value=connection)