diff --git a/tests/backends/base/__init__.py b/tests/backends/base/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/backends/base/test_base.py b/tests/backends/base/test_base.py new file mode 100644 index 0000000000..15cfbb8579 --- /dev/null +++ b/tests/backends/base/test_base.py @@ -0,0 +1,32 @@ +from django.db import DEFAULT_DB_ALIAS, connection, connections +from django.db.backends.base.base import BaseDatabaseWrapper +from django.test import SimpleTestCase + + +class DatabaseWrapperTests(SimpleTestCase): + + def test_initialization_class_attributes(self): + """ + The "initialization" class attributes like client_class and + creation_class should be set on the class and reflected in the + corresponding instance attributes of the instantiated backend. + """ + conn = connections[DEFAULT_DB_ALIAS] + conn_class = type(conn) + attr_names = [ + ('client_class', 'client'), + ('creation_class', 'creation'), + ('features_class', 'features'), + ('introspection_class', 'introspection'), + ('ops_class', 'ops'), + ('validation_class', 'validation'), + ] + for class_attr_name, instance_attr_name in attr_names: + class_attr_value = getattr(conn_class, class_attr_name) + self.assertIsNotNone(class_attr_value) + instance_attr_value = getattr(conn, instance_attr_name) + self.assertIsInstance(instance_attr_value, class_attr_value) + + def test_initialization_display_name(self): + self.assertEqual(BaseDatabaseWrapper.display_name, 'unknown') + self.assertNotEqual(connection.display_name, 'unknown') diff --git a/tests/backends/base/test_creation.py b/tests/backends/base/test_creation.py new file mode 100644 index 0000000000..519b3f049c --- /dev/null +++ b/tests/backends/base/test_creation.py @@ -0,0 +1,42 @@ +import copy + +from django.db import DEFAULT_DB_ALIAS, connections +from django.db.backends.base.creation import ( + TEST_DATABASE_PREFIX, BaseDatabaseCreation, +) +from django.test import SimpleTestCase + + +class TestDbSignatureTests(SimpleTestCase): + + def get_connection_copy(self): + # Get a copy of the default connection. (Can't use django.db.connection + # because it'll modify the default connection itself.) + test_connection = copy.copy(connections[DEFAULT_DB_ALIAS]) + test_connection.settings_dict = copy.copy(connections[DEFAULT_DB_ALIAS].settings_dict) + return test_connection + + def test_default_name(self): + # A test db name isn't set. + prod_name = 'hodor' + test_connection = self.get_connection_copy() + test_connection.settings_dict['NAME'] = prod_name + test_connection.settings_dict['TEST'] = {'NAME': None} + signature = BaseDatabaseCreation(test_connection).test_db_signature() + self.assertEqual(signature[3], TEST_DATABASE_PREFIX + prod_name) + + def test_custom_test_name(self): + # A regular test db name is set. + test_name = 'hodor' + test_connection = self.get_connection_copy() + test_connection.settings_dict['TEST'] = {'NAME': test_name} + signature = BaseDatabaseCreation(test_connection).test_db_signature() + self.assertEqual(signature[3], test_name) + + def test_custom_test_name_with_test_prefix(self): + # A test db name prefixed with TEST_DATABASE_PREFIX is set. + test_name = TEST_DATABASE_PREFIX + 'hodor' + test_connection = self.get_connection_copy() + test_connection.settings_dict['TEST'] = {'NAME': test_name} + signature = BaseDatabaseCreation(test_connection).test_db_signature() + self.assertEqual(signature[3], test_name) diff --git a/tests/backends/base/test_features.py b/tests/backends/base/test_features.py new file mode 100644 index 0000000000..831a0002a3 --- /dev/null +++ b/tests/backends/base/test_features.py @@ -0,0 +1,8 @@ +from django.db import connection +from django.test import TestCase + + +class TestDatabaseFeatures(TestCase): + + def test_nonexistent_feature(self): + self.assertFalse(hasattr(connection.features, 'nonexistent')) diff --git a/tests/backends/mysql/__init__.py b/tests/backends/mysql/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/backends/mysql/test_creation.py b/tests/backends/mysql/test_creation.py new file mode 100644 index 0000000000..e3a83346fe --- /dev/null +++ b/tests/backends/mysql/test_creation.py @@ -0,0 +1,45 @@ +import unittest +from io import StringIO +from unittest import mock + +from django.db import connection +from django.db.backends.base.creation import BaseDatabaseCreation +from django.db.backends.mysql.creation import DatabaseCreation +from django.db.utils import DatabaseError +from django.test import SimpleTestCase + + +@unittest.skipUnless(connection.vendor == 'mysql', 'MySQL tests') +class DatabaseCreationTests(SimpleTestCase): + + def _execute_raise_database_exists(self, cursor, parameters, keepdb=False): + raise DatabaseError(1007, "Can't create database '%s'; database exists" % parameters['dbname']) + + def _execute_raise_access_denied(self, cursor, parameters, keepdb=False): + raise DatabaseError(1044, "Access denied for user") + + def patch_test_db_creation(self, execute_create_test_db): + return mock.patch.object(BaseDatabaseCreation, '_execute_create_test_db', execute_create_test_db) + + @mock.patch('sys.stdout', new_callable=StringIO) + @mock.patch('sys.stderr', new_callable=StringIO) + def test_create_test_db_database_exists(self, *mocked_objects): + # Simulate test database creation raising "database exists" + creation = DatabaseCreation(connection) + with self.patch_test_db_creation(self._execute_raise_database_exists): + with mock.patch('builtins.input', return_value='no'): + with self.assertRaises(SystemExit): + # SystemExit is raised if the user answers "no" to the + # prompt asking if it's okay to delete the test database. + creation._create_test_db(verbosity=0, autoclobber=False, keepdb=False) + # "Database exists" shouldn't appear when keepdb is on + creation._create_test_db(verbosity=0, autoclobber=False, keepdb=True) + + @mock.patch('sys.stdout', new_callable=StringIO) + @mock.patch('sys.stderr', new_callable=StringIO) + def test_create_test_db_unexpected_error(self, *mocked_objects): + # Simulate test database creation raising unexpected error + creation = DatabaseCreation(connection) + with self.patch_test_db_creation(self._execute_raise_access_denied): + with self.assertRaises(SystemExit): + creation._create_test_db(verbosity=0, autoclobber=False, keepdb=False) diff --git a/tests/backends/test_features.py b/tests/backends/mysql/test_features.py similarity index 67% rename from tests/backends/test_features.py rename to tests/backends/mysql/test_features.py index 7bd57f0423..65c897823b 100644 --- a/tests/backends/test_features.py +++ b/tests/backends/mysql/test_features.py @@ -4,16 +4,10 @@ from django.db import connection from django.test import TestCase -class TestDatabaseFeatures(TestCase): +@skipUnless(connection.vendor == 'mysql', 'MySQL tests') +class TestFeatures(TestCase): - def test_nonexistent_feature(self): - self.assertFalse(hasattr(connection.features, 'nonexistent')) - - -@skipUnless(connection.vendor == 'mysql', 'MySQL backend tests') -class TestMySQLFeatures(TestCase): - - def test_mysql_supports_transactions(self): + def test_supports_transactions(self): """ All storage engines except MyISAM support transactions. """ diff --git a/tests/backends/test_mysql.py b/tests/backends/mysql/tests.py similarity index 97% rename from tests/backends/test_mysql.py rename to tests/backends/mysql/tests.py index 298ca9265f..c9d47eb012 100644 --- a/tests/backends/test_mysql.py +++ b/tests/backends/mysql/tests.py @@ -14,8 +14,8 @@ def get_connection(): @override_settings(DEBUG=True) -@unittest.skipUnless(connection.vendor == 'mysql', 'MySQL specific test.') -class MySQLTests(TestCase): +@unittest.skipUnless(connection.vendor == 'mysql', 'MySQL tests') +class IsolationLevelTests(TestCase): read_committed = 'read committed' repeatable_read = 'repeatable read' diff --git a/tests/backends/oracle/__init__.py b/tests/backends/oracle/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/backends/oracle/test_creation.py b/tests/backends/oracle/test_creation.py new file mode 100644 index 0000000000..1688c4efd2 --- /dev/null +++ b/tests/backends/oracle/test_creation.py @@ -0,0 +1,76 @@ +import unittest +from io import StringIO +from unittest import mock + +from django.db import connection +from django.db.backends.oracle.creation import DatabaseCreation +from django.db.utils import DatabaseError +from django.test import TestCase + + +@unittest.skipUnless(connection.vendor == 'oracle', 'Oracle tests') +@mock.patch.object(DatabaseCreation, '_maindb_connection', return_value=connection) +@mock.patch('sys.stdout', new_callable=StringIO) +@mock.patch('sys.stderr', new_callable=StringIO) +class DatabaseCreationTests(TestCase): + + def _execute_raise_user_already_exists(self, cursor, statements, parameters, verbosity, allow_quiet_fail=False): + # Raise "user already exists" only in test user creation + if statements and statements[0].startswith('CREATE USER'): + raise DatabaseError("ORA-01920: user name 'string' conflicts with another user or role name") + + def _execute_raise_tablespace_already_exists( + self, cursor, statements, parameters, verbosity, allow_quiet_fail=False + ): + raise DatabaseError("ORA-01543: tablespace 'string' already exists") + + def _execute_raise_insufficient_privileges( + self, cursor, statements, parameters, verbosity, allow_quiet_fail=False + ): + raise DatabaseError("ORA-01031: insufficient privileges") + + def _test_database_passwd(self): + # Mocked to avoid test user password changed + return connection.settings_dict['SAVED_PASSWORD'] + + def patch_execute_statements(self, execute_statements): + return mock.patch.object(DatabaseCreation, '_execute_statements', execute_statements) + + @mock.patch.object(DatabaseCreation, '_test_user_create', return_value=False) + def test_create_test_db(self, *mocked_objects): + creation = DatabaseCreation(connection) + # Simulate test database creation raising "tablespace already exists" + with self.patch_execute_statements(self._execute_raise_tablespace_already_exists): + with mock.patch('builtins.input', return_value='no'): + with self.assertRaises(SystemExit): + # SystemExit is raised if the user answers "no" to the + # prompt asking if it's okay to delete the test tablespace. + creation._create_test_db(verbosity=0, keepdb=False) + # "Tablespace already exists" error is ignored when keepdb is on + creation._create_test_db(verbosity=0, keepdb=True) + # Simulate test database creation raising unexpected error + with self.patch_execute_statements(self._execute_raise_insufficient_privileges): + with self.assertRaises(SystemExit): + creation._create_test_db(verbosity=0, keepdb=False) + with self.assertRaises(SystemExit): + creation._create_test_db(verbosity=0, keepdb=True) + + @mock.patch.object(DatabaseCreation, '_test_database_create', return_value=False) + def test_create_test_user(self, *mocked_objects): + creation = DatabaseCreation(connection) + with mock.patch.object(DatabaseCreation, '_test_database_passwd', self._test_database_passwd): + # Simulate test user creation raising "user already exists" + with self.patch_execute_statements(self._execute_raise_user_already_exists): + with mock.patch('builtins.input', return_value='no'): + with self.assertRaises(SystemExit): + # SystemExit is raised if the user answers "no" to the + # prompt asking if it's okay to delete the test user. + creation._create_test_db(verbosity=0, keepdb=False) + # "User already exists" error is ignored when keepdb is on + creation._create_test_db(verbosity=0, keepdb=True) + # Simulate test user creation raising unexpected error + with self.patch_execute_statements(self._execute_raise_insufficient_privileges): + with self.assertRaises(SystemExit): + creation._create_test_db(verbosity=0, keepdb=False) + with self.assertRaises(SystemExit): + creation._create_test_db(verbosity=0, keepdb=True) diff --git a/tests/backends/oracle/tests.py b/tests/backends/oracle/tests.py new file mode 100644 index 0000000000..d57d91e677 --- /dev/null +++ b/tests/backends/oracle/tests.py @@ -0,0 +1,55 @@ +import unittest + +from django.db import connection + + +@unittest.skipUnless(connection.vendor == 'oracle', 'Oracle tests') +class Tests(unittest.TestCase): + + def test_quote_name(self): + """'%' chars are escaped for query execution.""" + name = '"SOME%NAME"' + quoted_name = connection.ops.quote_name(name) + self.assertEqual(quoted_name % (), name) + + def test_dbms_session(self): + """A stored procedure can be called through a cursor wrapper.""" + with connection.cursor() as cursor: + cursor.callproc('DBMS_SESSION.SET_IDENTIFIER', ['_django_testing!']) + + def test_cursor_var(self): + """Cursor variables can be passed as query parameters.""" + from django.db.backends.oracle.base import Database + with connection.cursor() as cursor: + var = cursor.var(Database.STRING) + cursor.execute("BEGIN %s := 'X'; END; ", [var]) + self.assertEqual(var.getvalue(), 'X') + + def test_long_string(self): + """Text longer than 4000 chars can be saved and read.""" + with connection.cursor() as cursor: + cursor.execute('CREATE TABLE ltext ("TEXT" NCLOB)') + long_str = ''.join(str(x) for x in range(4000)) + cursor.execute('INSERT INTO ltext VALUES (%s)', [long_str]) + cursor.execute('SELECT text FROM ltext') + row = cursor.fetchone() + self.assertEqual(long_str, row[0].read()) + cursor.execute('DROP TABLE ltext') + + def test_client_encoding(self): + """Client encoding is set correctly.""" + connection.ensure_connection() + self.assertEqual(connection.connection.encoding, 'UTF-8') + self.assertEqual(connection.connection.nencoding, 'UTF-8') + + def test_order_of_nls_parameters(self): + """ + An 'almost right' datetime works with configured NLS parameters + (#18465). + """ + with connection.cursor() as cursor: + query = "select 1 from dual where '1936-12-29 00:00' < sysdate" + # The query succeeds without errors - pre #18465 this + # wasn't the case. + cursor.execute(query) + self.assertEqual(cursor.fetchone()[0], 1) diff --git a/tests/backends/postgresql/__init__.py b/tests/backends/postgresql/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/backends/postgresql/test_creation.py b/tests/backends/postgresql/test_creation.py new file mode 100644 index 0000000000..9554e97a54 --- /dev/null +++ b/tests/backends/postgresql/test_creation.py @@ -0,0 +1,95 @@ +import unittest +from contextlib import contextmanager +from io import StringIO +from unittest import mock + +from django.db import connection +from django.db.backends.base.creation import BaseDatabaseCreation +from django.db.utils import DatabaseError +from django.test import SimpleTestCase + +try: + import psycopg2 # NOQA +except ImportError: + pass +else: + from psycopg2 import errorcodes + from django.db.backends.postgresql.creation import DatabaseCreation + + +@unittest.skipUnless(connection.vendor == 'postgresql', 'PostgreSQL tests') +class DatabaseCreationTests(SimpleTestCase): + + @contextmanager + def changed_test_settings(self, **kwargs): + settings = connection.settings_dict['TEST'] + saved_values = {} + for name in kwargs: + if name in settings: + saved_values[name] = settings[name] + + for name, value in kwargs.items(): + settings[name] = value + try: + yield + finally: + for name, value in kwargs.items(): + if name in saved_values: + settings[name] = saved_values[name] + else: + del settings[name] + + def check_sql_table_creation_suffix(self, settings, expected): + with self.changed_test_settings(**settings): + creation = DatabaseCreation(connection) + suffix = creation.sql_table_creation_suffix() + self.assertEqual(suffix, expected) + + def test_sql_table_creation_suffix_with_none_settings(self): + settings = {'CHARSET': None, 'TEMPLATE': None} + self.check_sql_table_creation_suffix(settings, "") + + def test_sql_table_creation_suffix_with_encoding(self): + settings = {'CHARSET': 'UTF8'} + self.check_sql_table_creation_suffix(settings, "WITH ENCODING 'UTF8'") + + def test_sql_table_creation_suffix_with_template(self): + settings = {'TEMPLATE': 'template0'} + self.check_sql_table_creation_suffix(settings, 'WITH TEMPLATE "template0"') + + def test_sql_table_creation_suffix_with_encoding_and_template(self): + settings = {'CHARSET': 'UTF8', 'TEMPLATE': 'template0'} + self.check_sql_table_creation_suffix(settings, '''WITH ENCODING 'UTF8' TEMPLATE "template0"''') + + def _execute_raise_database_already_exists(self, cursor, parameters, keepdb=False): + error = DatabaseError('database %s already exists' % parameters['dbname']) + error.pgcode = errorcodes.DUPLICATE_DATABASE + raise DatabaseError() from error + + def _execute_raise_permission_denied(self, cursor, parameters, keepdb=False): + error = DatabaseError('permission denied to create database') + error.pgcode = errorcodes.INSUFFICIENT_PRIVILEGE + raise DatabaseError() from error + + def patch_test_db_creation(self, execute_create_test_db): + return mock.patch.object(BaseDatabaseCreation, '_execute_create_test_db', execute_create_test_db) + + @mock.patch('sys.stdout', new_callable=StringIO) + @mock.patch('sys.stderr', new_callable=StringIO) + def test_create_test_db(self, *mocked_objects): + creation = DatabaseCreation(connection) + # Simulate test database creation raising "database already exists" + with self.patch_test_db_creation(self._execute_raise_database_already_exists): + with mock.patch('builtins.input', return_value='no'): + with self.assertRaises(SystemExit): + # SystemExit is raised if the user answers "no" to the + # prompt asking if it's okay to delete the test database. + creation._create_test_db(verbosity=0, autoclobber=False, keepdb=False) + # "Database already exists" error is ignored when keepdb is on + creation._create_test_db(verbosity=0, autoclobber=False, keepdb=True) + # Simulate test database creation raising unexpected error + with self.patch_test_db_creation(self._execute_raise_permission_denied): + with self.assertRaises(SystemExit): + creation._create_test_db(verbosity=0, autoclobber=False, keepdb=False) + with self.assertRaises(SystemExit): + creation._create_test_db(verbosity=0, autoclobber=False, keepdb=True) diff --git a/tests/backends/test_postgresql.py b/tests/backends/postgresql/test_server_side_cursors.py similarity index 96% rename from tests/backends/test_postgresql.py rename to tests/backends/postgresql/test_server_side_cursors.py index f4020f4e5c..8576686211 100644 --- a/tests/backends/test_postgresql.py +++ b/tests/backends/postgresql/test_server_side_cursors.py @@ -6,10 +6,10 @@ from contextlib import contextmanager from django.db import connection from django.test import TestCase -from .models import Person +from ..models import Person -@unittest.skipUnless(connection.vendor == 'postgresql', "Test only for PostgreSQL") +@unittest.skipUnless(connection.vendor == 'postgresql', 'PostgreSQL tests') class ServerSideCursorsPostgres(TestCase): cursor_fields = 'name, statement, is_holdable, is_binary, is_scrollable, creation_time' PostgresCursor = namedtuple('PostgresCursor', cursor_fields) diff --git a/tests/backends/postgresql/tests.py b/tests/backends/postgresql/tests.py new file mode 100644 index 0000000000..140fbbc444 --- /dev/null +++ b/tests/backends/postgresql/tests.py @@ -0,0 +1,147 @@ +import unittest +import warnings +from unittest import mock + +from django.db import DatabaseError, connection +from django.test import TestCase + + +@unittest.skipUnless(connection.vendor == 'postgresql', 'PostgreSQL tests') +class Tests(TestCase): + + def test_nodb_connection(self): + """ + The _nodb_connection property fallbacks to the default connection + database when access to the 'postgres' database is not granted. + """ + def mocked_connect(self): + if self.settings_dict['NAME'] is None: + raise DatabaseError() + return '' + + nodb_conn = connection._nodb_connection + self.assertIsNone(nodb_conn.settings_dict['NAME']) + + # Now assume the 'postgres' db isn't available + with warnings.catch_warnings(record=True) as w: + with mock.patch('django.db.backends.base.base.BaseDatabaseWrapper.connect', + side_effect=mocked_connect, autospec=True): + warnings.simplefilter('always', RuntimeWarning) + nodb_conn = connection._nodb_connection + self.assertIsNotNone(nodb_conn.settings_dict['NAME']) + self.assertEqual(nodb_conn.settings_dict['NAME'], connection.settings_dict['NAME']) + # Check a RuntimeWarning has been emitted + self.assertEqual(len(w), 1) + self.assertEqual(w[0].message.__class__, RuntimeWarning) + + def test_connect_and_rollback(self): + """ + PostgreSQL shouldn't roll back SET TIME ZONE, even if the first + transaction is rolled back (#17062). + """ + new_connection = connection.copy() + try: + # Ensure the database default time zone is different than + # the time zone in new_connection.settings_dict. We can + # get the default time zone by reset & show. + cursor = new_connection.cursor() + cursor.execute("RESET TIMEZONE") + cursor.execute("SHOW TIMEZONE") + db_default_tz = cursor.fetchone()[0] + new_tz = 'Europe/Paris' if db_default_tz == 'UTC' else 'UTC' + new_connection.close() + + # Invalidate timezone name cache, because the setting_changed + # handler cannot know about new_connection. + del new_connection.timezone_name + + # Fetch a new connection with the new_tz as default + # time zone, run a query and rollback. + with self.settings(TIME_ZONE=new_tz): + new_connection.set_autocommit(False) + cursor = new_connection.cursor() + new_connection.rollback() + + # Now let's see if the rollback rolled back the SET TIME ZONE. + cursor.execute("SHOW TIMEZONE") + tz = cursor.fetchone()[0] + self.assertEqual(new_tz, tz) + + finally: + new_connection.close() + + def test_connect_non_autocommit(self): + """ + The connection wrapper shouldn't believe that autocommit is enabled + after setting the time zone when AUTOCOMMIT is False (#21452). + """ + new_connection = connection.copy() + new_connection.settings_dict['AUTOCOMMIT'] = False + + try: + # Open a database connection. + new_connection.cursor() + self.assertFalse(new_connection.get_autocommit()) + finally: + new_connection.close() + + def test_connect_isolation_level(self): + """ + The transaction level can be configured with + DATABASES ['OPTIONS']['isolation_level']. + """ + import psycopg2 + from psycopg2.extensions import ( + ISOLATION_LEVEL_READ_COMMITTED as read_committed, + ISOLATION_LEVEL_SERIALIZABLE as serializable, + ) + # Since this is a django.test.TestCase, a transaction is in progress + # and the isolation level isn't reported as 0. This test assumes that + # PostgreSQL is configured with the default isolation level. + + # Check the level on the psycopg2 connection, not the Django wrapper. + default_level = read_committed if psycopg2.__version__ < '2.7' else None + self.assertEqual(connection.connection.isolation_level, default_level) + + new_connection = connection.copy() + new_connection.settings_dict['OPTIONS']['isolation_level'] = serializable + try: + # Start a transaction so the isolation level isn't reported as 0. + new_connection.set_autocommit(False) + # Check the level on the psycopg2 connection, not the Django wrapper. + self.assertEqual(new_connection.connection.isolation_level, serializable) + finally: + new_connection.close() + + def _select(self, val): + with connection.cursor() as cursor: + cursor.execute('SELECT %s', (val,)) + return cursor.fetchone()[0] + + def test_select_ascii_array(self): + a = ['awef'] + b = self._select(a) + self.assertEqual(a[0], b[0]) + + def test_select_unicode_array(self): + a = ['ᄲawef'] + b = self._select(a) + self.assertEqual(a[0], b[0]) + + def test_lookup_cast(self): + from django.db.backends.postgresql.operations import DatabaseOperations + do = DatabaseOperations(connection=None) + lookups = ( + 'iexact', 'contains', 'icontains', 'startswith', 'istartswith', + 'endswith', 'iendswith', 'regex', 'iregex', + ) + for lookup in lookups: + with self.subTest(lookup=lookup): + self.assertIn('::text', do.lookup_cast(lookup)) + + def test_correct_extraction_psycopg2_version(self): + from django.db.backends.postgresql.base import psycopg2_version + with mock.patch('psycopg2.__version__', '4.2.1 (dt dec pq3 ext lo64)'): + self.assertEqual(psycopg2_version(), (4, 2, 1)) + with mock.patch('psycopg2.__version__', '4.2b0.dev1 (dt dec pq3 ext lo64)'): + self.assertEqual(psycopg2_version(), (4, 2)) diff --git a/tests/backends/sqlite/__init__.py b/tests/backends/sqlite/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/backends/sqlite/tests.py b/tests/backends/sqlite/tests.py new file mode 100644 index 0000000000..838835ccdd --- /dev/null +++ b/tests/backends/sqlite/tests.py @@ -0,0 +1,138 @@ +import re +import threading +import unittest + +from django.core.exceptions import ImproperlyConfigured +from django.db import connection +from django.db.models import Avg, StdDev, Sum, Variance +from django.test import ( + TestCase, TransactionTestCase, override_settings, skipUnlessDBFeature, +) + +from ..models import Item, Object, Square + + +@unittest.skipUnless(connection.vendor == 'sqlite', 'SQLite tests') +class Tests(TestCase): + longMessage = True + + def test_autoincrement(self): + """ + auto_increment fields are created with the AUTOINCREMENT keyword + in order to be monotonically increasing (#10164). + """ + with connection.schema_editor(collect_sql=True) as editor: + editor.create_model(Square) + statements = editor.collected_sql + match = re.search('"id" ([^,]+),', statements[0]) + self.assertIsNotNone(match) + self.assertEqual( + 'integer NOT NULL PRIMARY KEY AUTOINCREMENT', + match.group(1), + 'Wrong SQL used to create an auto-increment column on SQLite' + ) + + def test_aggregation(self): + """ + Raise NotImplementedError when aggregating on date/time fields (#19360). + """ + for aggregate in (Sum, Avg, Variance, StdDev): + with self.assertRaises(NotImplementedError): + Item.objects.all().aggregate(aggregate('time')) + with self.assertRaises(NotImplementedError): + Item.objects.all().aggregate(aggregate('date')) + with self.assertRaises(NotImplementedError): + Item.objects.all().aggregate(aggregate('last_modified')) + with self.assertRaises(NotImplementedError): + Item.objects.all().aggregate( + **{'complex': aggregate('last_modified') + aggregate('last_modified')} + ) + + def test_memory_db_test_name(self): + """A named in-memory db should be allowed where supported.""" + from django.db.backends.sqlite3.base import DatabaseWrapper + settings_dict = { + 'TEST': { + 'NAME': 'file:memorydb_test?mode=memory&cache=shared', + } + } + wrapper = DatabaseWrapper(settings_dict) + creation = wrapper.creation + if creation.connection.features.can_share_in_memory_db: + expected = creation.connection.settings_dict['TEST']['NAME'] + self.assertEqual(creation._get_test_db_name(), expected) + else: + msg = ( + "Using a shared memory database with `mode=memory` in the " + "database name is not supported in your environment, " + "use `:memory:` instead." + ) + with self.assertRaisesMessage(ImproperlyConfigured, msg): + creation._get_test_db_name() + + +@unittest.skipUnless(connection.vendor == 'sqlite', 'Test only for SQLite') +@override_settings(DEBUG=True) +class LastExecutedQueryTest(TestCase): + + def test_no_interpolation(self): + # This shouldn't raise an exception (#17158) + query = "SELECT strftime('%Y', 'now');" + connection.cursor().execute(query) + self.assertEqual(connection.queries[-1]['sql'], query) + + def test_parameter_quoting(self): + # The implementation of last_executed_queries isn't optimal. It's + # worth testing that parameters are quoted (#14091). + query = "SELECT %s" + params = ["\"'\\"] + connection.cursor().execute(query, params) + # Note that the single quote is repeated + substituted = "SELECT '\"''\\'" + self.assertEqual(connection.queries[-1]['sql'], substituted) + + def test_large_number_of_parameters(self): + # If SQLITE_MAX_VARIABLE_NUMBER (default = 999) has been changed to be + # greater than SQLITE_MAX_COLUMN (default = 2000), last_executed_query + # can hit the SQLITE_MAX_COLUMN limit (#26063). + cursor = connection.cursor() + sql = "SELECT MAX(%s)" % ", ".join(["%s"] * 2001) + params = list(range(2001)) + # This should not raise an exception. + cursor.db.ops.last_executed_query(cursor.cursor, sql, params) + + +@unittest.skipUnless(connection.vendor == 'sqlite', 'SQLite tests') +class EscapingChecks(TestCase): + """ + All tests in this test case are also run with settings.DEBUG=True in + EscapingChecksDebug test case, to also test CursorDebugWrapper. + """ + def test_parameter_escaping(self): + # '%s' escaping support for sqlite3 (#13648). + cursor = connection.cursor() + cursor.execute("select strftime('%s', date('now'))") + response = cursor.fetchall()[0][0] + # response should be an non-zero integer + self.assertTrue(int(response)) + + +@unittest.skipUnless(connection.vendor == 'sqlite', 'SQLite tests') +@override_settings(DEBUG=True) +class EscapingChecksDebug(EscapingChecks): + pass + + +@unittest.skipUnless(connection.vendor == 'sqlite', 'SQLite tests') +@skipUnlessDBFeature('can_share_in_memory_db') +class ThreadSharing(TransactionTestCase): + available_apps = ['backends'] + + def test_database_sharing_in_threads(self): + def create_object(): + Object.objects.create() + create_object() + thread = threading.Thread(target=create_object) + thread.start() + thread.join() + self.assertEqual(Object.objects.count(), 2) diff --git a/tests/backends/test_creation.py b/tests/backends/test_creation.py deleted file mode 100644 index 7a18a8d19b..0000000000 --- a/tests/backends/test_creation.py +++ /dev/null @@ -1,244 +0,0 @@ -import copy -import unittest -from contextlib import contextmanager -from io import StringIO -from unittest import mock - -from django.db import DEFAULT_DB_ALIAS, connection, connections -from django.db.backends.base.creation import ( - TEST_DATABASE_PREFIX, BaseDatabaseCreation, -) -from django.db.backends.mysql.creation import ( - DatabaseCreation as MySQLDatabaseCreation, -) -from django.db.backends.oracle.creation import ( - DatabaseCreation as OracleDatabaseCreation, -) -from django.db.utils import DatabaseError -from django.test import SimpleTestCase, TestCase - -try: - import psycopg2 # NOQA -except ImportError: - pass -else: - from psycopg2 import errorcodes - from django.db.backends.postgresql.creation import \ - DatabaseCreation as PostgreSQLDatabaseCreation - - -class TestDbSignatureTests(SimpleTestCase): - - def get_connection_copy(self): - # Get a copy of the default connection. (Can't use django.db.connection - # because it'll modify the default connection itself.) - test_connection = copy.copy(connections[DEFAULT_DB_ALIAS]) - test_connection.settings_dict = copy.copy(connections[DEFAULT_DB_ALIAS].settings_dict) - return test_connection - - def test_default_name(self): - # A test db name isn't set. - prod_name = 'hodor' - test_connection = self.get_connection_copy() - test_connection.settings_dict['NAME'] = prod_name - test_connection.settings_dict['TEST'] = {'NAME': None} - signature = BaseDatabaseCreation(test_connection).test_db_signature() - self.assertEqual(signature[3], TEST_DATABASE_PREFIX + prod_name) - - def test_custom_test_name(self): - # A regular test db name is set. - test_name = 'hodor' - test_connection = self.get_connection_copy() - test_connection.settings_dict['TEST'] = {'NAME': test_name} - signature = BaseDatabaseCreation(test_connection).test_db_signature() - self.assertEqual(signature[3], test_name) - - def test_custom_test_name_with_test_prefix(self): - # A test db name prefixed with TEST_DATABASE_PREFIX is set. - test_name = TEST_DATABASE_PREFIX + 'hodor' - test_connection = self.get_connection_copy() - test_connection.settings_dict['TEST'] = {'NAME': test_name} - signature = BaseDatabaseCreation(test_connection).test_db_signature() - self.assertEqual(signature[3], test_name) - - -@unittest.skipUnless(connection.vendor == 'postgresql', "PostgreSQL-specific tests") -class PostgreSQLDatabaseCreationTests(SimpleTestCase): - - @contextmanager - def changed_test_settings(self, **kwargs): - settings = connection.settings_dict['TEST'] - saved_values = {} - for name in kwargs: - if name in settings: - saved_values[name] = settings[name] - - for name, value in kwargs.items(): - settings[name] = value - try: - yield - finally: - for name, value in kwargs.items(): - if name in saved_values: - settings[name] = saved_values[name] - else: - del settings[name] - - def check_sql_table_creation_suffix(self, settings, expected): - with self.changed_test_settings(**settings): - creation = PostgreSQLDatabaseCreation(connection) - suffix = creation.sql_table_creation_suffix() - self.assertEqual(suffix, expected) - - def test_sql_table_creation_suffix_with_none_settings(self): - settings = {'CHARSET': None, 'TEMPLATE': None} - self.check_sql_table_creation_suffix(settings, "") - - def test_sql_table_creation_suffix_with_encoding(self): - settings = {'CHARSET': 'UTF8'} - self.check_sql_table_creation_suffix(settings, "WITH ENCODING 'UTF8'") - - def test_sql_table_creation_suffix_with_template(self): - settings = {'TEMPLATE': 'template0'} - self.check_sql_table_creation_suffix(settings, 'WITH TEMPLATE "template0"') - - def test_sql_table_creation_suffix_with_encoding_and_template(self): - settings = {'CHARSET': 'UTF8', 'TEMPLATE': 'template0'} - self.check_sql_table_creation_suffix(settings, '''WITH ENCODING 'UTF8' TEMPLATE "template0"''') - - def _execute_raise_database_already_exists(self, cursor, parameters, keepdb=False): - error = DatabaseError('database %s already exists' % parameters['dbname']) - error.pgcode = errorcodes.DUPLICATE_DATABASE - raise DatabaseError() from error - - def _execute_raise_permission_denied(self, cursor, parameters, keepdb=False): - error = DatabaseError('permission denied to create database') - error.pgcode = errorcodes.INSUFFICIENT_PRIVILEGE - raise DatabaseError() from error - - def patch_test_db_creation(self, execute_create_test_db): - return mock.patch.object(BaseDatabaseCreation, '_execute_create_test_db', execute_create_test_db) - - @mock.patch('sys.stdout', new_callable=StringIO) - @mock.patch('sys.stderr', new_callable=StringIO) - def test_create_test_db(self, *mocked_objects): - creation = PostgreSQLDatabaseCreation(connection) - # Simulate test database creation raising "database already exists" - with self.patch_test_db_creation(self._execute_raise_database_already_exists): - with mock.patch('builtins.input', return_value='no'): - with self.assertRaises(SystemExit): - # SystemExit is raised if the user answers "no" to the - # prompt asking if it's okay to delete the test database. - creation._create_test_db(verbosity=0, autoclobber=False, keepdb=False) - # "Database already exists" error is ignored when keepdb is on - creation._create_test_db(verbosity=0, autoclobber=False, keepdb=True) - # Simulate test database creation raising unexpected error - with self.patch_test_db_creation(self._execute_raise_permission_denied): - with self.assertRaises(SystemExit): - creation._create_test_db(verbosity=0, autoclobber=False, keepdb=False) - with self.assertRaises(SystemExit): - creation._create_test_db(verbosity=0, autoclobber=False, keepdb=True) - - -@unittest.skipUnless(connection.vendor == 'oracle', "Oracle specific tests") -@mock.patch.object(OracleDatabaseCreation, '_maindb_connection', return_value=connection) -@mock.patch('sys.stdout', new_callable=StringIO) -@mock.patch('sys.stderr', new_callable=StringIO) -class OracleDatabaseCreationTests(TestCase): - - def _execute_raise_user_already_exists(self, cursor, statements, parameters, verbosity, allow_quiet_fail=False): - # Raise "user already exists" only in test user creation - if statements and statements[0].startswith('CREATE USER'): - raise DatabaseError("ORA-01920: user name 'string' conflicts with another user or role name") - - def _execute_raise_tablespace_already_exists( - self, cursor, statements, parameters, verbosity, allow_quiet_fail=False - ): - raise DatabaseError("ORA-01543: tablespace 'string' already exists") - - def _execute_raise_insufficient_privileges( - self, cursor, statements, parameters, verbosity, allow_quiet_fail=False - ): - raise DatabaseError("ORA-01031: insufficient privileges") - - def _test_database_passwd(self): - # Mocked to avoid test user password changed - return connection.settings_dict['SAVED_PASSWORD'] - - def patch_execute_statements(self, execute_statements): - return mock.patch.object(OracleDatabaseCreation, '_execute_statements', execute_statements) - - @mock.patch.object(OracleDatabaseCreation, '_test_user_create', return_value=False) - def test_create_test_db(self, *mocked_objects): - creation = OracleDatabaseCreation(connection) - # Simulate test database creation raising "tablespace already exists" - with self.patch_execute_statements(self._execute_raise_tablespace_already_exists): - with mock.patch('builtins.input', return_value='no'): - with self.assertRaises(SystemExit): - # SystemExit is raised if the user answers "no" to the - # prompt asking if it's okay to delete the test tablespace. - creation._create_test_db(verbosity=0, keepdb=False) - # "Tablespace already exists" error is ignored when keepdb is on - creation._create_test_db(verbosity=0, keepdb=True) - # Simulate test database creation raising unexpected error - with self.patch_execute_statements(self._execute_raise_insufficient_privileges): - with self.assertRaises(SystemExit): - creation._create_test_db(verbosity=0, keepdb=False) - with self.assertRaises(SystemExit): - creation._create_test_db(verbosity=0, keepdb=True) - - @mock.patch.object(OracleDatabaseCreation, '_test_database_create', return_value=False) - def test_create_test_user(self, *mocked_objects): - creation = OracleDatabaseCreation(connection) - with mock.patch.object(OracleDatabaseCreation, '_test_database_passwd', self._test_database_passwd): - # Simulate test user creation raising "user already exists" - with self.patch_execute_statements(self._execute_raise_user_already_exists): - with mock.patch('builtins.input', return_value='no'): - with self.assertRaises(SystemExit): - # SystemExit is raised if the user answers "no" to the - # prompt asking if it's okay to delete the test user. - creation._create_test_db(verbosity=0, keepdb=False) - # "User already exists" error is ignored when keepdb is on - creation._create_test_db(verbosity=0, keepdb=True) - # Simulate test user creation raising unexpected error - with self.patch_execute_statements(self._execute_raise_insufficient_privileges): - with self.assertRaises(SystemExit): - creation._create_test_db(verbosity=0, keepdb=False) - with self.assertRaises(SystemExit): - creation._create_test_db(verbosity=0, keepdb=True) - - -@unittest.skipUnless(connection.vendor == 'mysql', "MySQL specific tests") -class MySQLDatabaseCreationTests(SimpleTestCase): - - def _execute_raise_database_exists(self, cursor, parameters, keepdb=False): - raise DatabaseError(1007, "Can't create database '%s'; database exists" % parameters['dbname']) - - def _execute_raise_access_denied(self, cursor, parameters, keepdb=False): - raise DatabaseError(1044, "Access denied for user") - - def patch_test_db_creation(self, execute_create_test_db): - return mock.patch.object(BaseDatabaseCreation, '_execute_create_test_db', execute_create_test_db) - - @mock.patch('sys.stdout', new_callable=StringIO) - @mock.patch('sys.stderr', new_callable=StringIO) - def test_create_test_db_database_exists(self, *mocked_objects): - # Simulate test database creation raising "database exists" - creation = MySQLDatabaseCreation(connection) - with self.patch_test_db_creation(self._execute_raise_database_exists): - with mock.patch('builtins.input', return_value='no'): - with self.assertRaises(SystemExit): - # SystemExit is raised if the user answers "no" to the - # prompt asking if it's okay to delete the test database. - creation._create_test_db(verbosity=0, autoclobber=False, keepdb=False) - # "Database exists" shouldn't appear when keepdb is on - creation._create_test_db(verbosity=0, autoclobber=False, keepdb=True) - - @mock.patch('sys.stdout', new_callable=StringIO) - @mock.patch('sys.stderr', new_callable=StringIO) - def test_create_test_db_unexpected_error(self, *mocked_objects): - # Simulate test database creation raising unexpected error - creation = MySQLDatabaseCreation(connection) - with self.patch_test_db_creation(self._execute_raise_access_denied): - with self.assertRaises(SystemExit): - creation._create_test_db(verbosity=0, autoclobber=False, keepdb=False) diff --git a/tests/backends/test_utils.py b/tests/backends/test_utils.py index 60429c5543..2ef1e4b9f7 100644 --- a/tests/backends/test_utils.py +++ b/tests/backends/test_utils.py @@ -1,22 +1,11 @@ -import unittest +"""Tests for django.db.backends.utils""" +from decimal import Decimal, Rounded -from django.core.exceptions import ImproperlyConfigured -from django.db import connection -from django.db.backends.utils import truncate_name -from django.db.utils import ProgrammingError, load_backend -from django.test import SimpleTestCase, TestCase +from django.db.backends.utils import format_number, truncate_name +from django.test import SimpleTestCase -class TestLoadBackend(SimpleTestCase): - def test_load_backend_invalid_name(self): - msg = ( - "'foo' isn't an available database backend.\n" - "Try using 'django.db.backends.XXX', where XXX is one of:\n" - " 'mysql', 'oracle', 'postgresql', 'sqlite3'" - ) - with self.assertRaisesMessage(ImproperlyConfigured, msg) as cm: - load_backend('foo') - self.assertEqual(str(cm.exception.__cause__), "No module named 'foo'") +class TestUtils(SimpleTestCase): def test_truncate_name(self): self.assertEqual(truncate_name('some_table', 10), 'some_table') @@ -28,15 +17,31 @@ class TestLoadBackend(SimpleTestCase): self.assertEqual(truncate_name('username"."some_long_table', 10), 'username"."some_la38a') self.assertEqual(truncate_name('username"."some_long_table', 10, 3), 'username"."some_loa38') + def test_format_number(self): + def equal(value, max_d, places, result): + self.assertEqual(format_number(Decimal(value), max_d, places), result) -@unittest.skipUnless(connection.vendor == 'postgresql', 'PostgreSQL specific tests') -class TestDatabaseErrorWrapper(TestCase): - def test_reraising_backend_specific_database_exception(self): - cursor = connection.cursor() - msg = 'table "X" does not exist' - with self.assertRaisesMessage(ProgrammingError, msg) as cm: - cursor.execute('DROP TABLE "X"') - self.assertNotEqual(type(cm.exception), type(cm.exception.__cause__)) - self.assertIsNotNone(cm.exception.__cause__) - self.assertIsNotNone(cm.exception.__cause__.pgcode) - self.assertIsNotNone(cm.exception.__cause__.pgerror) + equal('0', 12, 3, '0.000') + equal('0', 12, 8, '0.00000000') + equal('1', 12, 9, '1.000000000') + equal('0.00000000', 12, 8, '0.00000000') + equal('0.000000004', 12, 8, '0.00000000') + equal('0.000000008', 12, 8, '0.00000001') + equal('0.000000000000000000999', 10, 8, '0.00000000') + equal('0.1234567890', 12, 10, '0.1234567890') + equal('0.1234567890', 12, 9, '0.123456789') + equal('0.1234567890', 12, 8, '0.12345679') + equal('0.1234567890', 12, 5, '0.12346') + equal('0.1234567890', 12, 3, '0.123') + equal('0.1234567890', 12, 1, '0.1') + equal('0.1234567890', 12, 0, '0') + equal('0.1234567890', None, 0, '0') + equal('1234567890.1234567890', None, 0, '1234567890') + equal('1234567890.1234567890', None, 2, '1234567890.12') + equal('0.1234', 5, None, '0.1234') + equal('123.12', 5, None, '123.12') + + with self.assertRaises(Rounded): + equal('0.1234567890', 5, None, '0.12346') + with self.assertRaises(Rounded): + equal('1234567890.1234', 5, None, '1234600000') diff --git a/tests/backends/tests.py b/tests/backends/tests.py index 8847b178ef..6d38625a98 100644 --- a/tests/backends/tests.py +++ b/tests/backends/tests.py @@ -1,13 +1,9 @@ -# Unit and doctests for specific database backends. +"""Tests related to django.db.backends that haven't been organized.""" import datetime -import re import threading import unittest import warnings -from decimal import Decimal, Rounded -from unittest import mock -from django.core.exceptions import ImproperlyConfigured from django.core.management.color import no_style from django.db import ( DEFAULT_DB_ALIAS, DatabaseError, IntegrityError, connection, connections, @@ -15,322 +11,20 @@ from django.db import ( ) from django.db.backends.base.base import BaseDatabaseWrapper from django.db.backends.signals import connection_created -from django.db.backends.utils import CursorWrapper, format_number -from django.db.models import Avg, StdDev, Sum, Variance +from django.db.backends.utils import CursorWrapper from django.db.models.sql.constants import CURSOR -from django.db.utils import ConnectionHandler from django.test import ( - SimpleTestCase, TestCase, TransactionTestCase, override_settings, - skipIfDBFeature, skipUnlessDBFeature, + TestCase, TransactionTestCase, override_settings, skipIfDBFeature, + skipUnlessDBFeature, ) from .models import ( - Article, Item, Object, ObjectReference, Person, Post, RawData, Reporter, + Article, Object, ObjectReference, Person, Post, RawData, Reporter, ReporterProxy, SchoolClass, Square, VeryLongModelNameZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZ, ) -class DatabaseWrapperTests(SimpleTestCase): - - def test_initialization_class_attributes(self): - """ - The "initialization" class attributes like client_class and - creation_class should be set on the class and reflected in the - corresponding instance attributes of the instantiated backend. - """ - conn = connections[DEFAULT_DB_ALIAS] - conn_class = type(conn) - attr_names = [ - ('client_class', 'client'), - ('creation_class', 'creation'), - ('features_class', 'features'), - ('introspection_class', 'introspection'), - ('ops_class', 'ops'), - ('validation_class', 'validation'), - ] - for class_attr_name, instance_attr_name in attr_names: - class_attr_value = getattr(conn_class, class_attr_name) - self.assertIsNotNone(class_attr_value) - instance_attr_value = getattr(conn, instance_attr_name) - self.assertIsInstance(instance_attr_value, class_attr_value) - - def test_initialization_display_name(self): - self.assertEqual(BaseDatabaseWrapper.display_name, 'unknown') - self.assertNotEqual(connection.display_name, 'unknown') - - -class DummyBackendTest(SimpleTestCase): - - def test_no_databases(self): - """ - Empty DATABASES setting default to the dummy backend. - """ - DATABASES = {} - conns = ConnectionHandler(DATABASES) - self.assertEqual(conns[DEFAULT_DB_ALIAS].settings_dict['ENGINE'], 'django.db.backends.dummy') - with self.assertRaises(ImproperlyConfigured): - conns[DEFAULT_DB_ALIAS].ensure_connection() - - -@unittest.skipUnless(connection.vendor == 'oracle', "Test only for Oracle") -class OracleTests(unittest.TestCase): - - def test_quote_name(self): - # '%' chars are escaped for query execution. - name = '"SOME%NAME"' - quoted_name = connection.ops.quote_name(name) - self.assertEqual(quoted_name % (), name) - - def test_dbms_session(self): - # If the backend is Oracle, test that we can call a standard - # stored procedure through our cursor wrapper. - with connection.cursor() as cursor: - cursor.callproc('DBMS_SESSION.SET_IDENTIFIER', ['_django_testing!']) - - def test_cursor_var(self): - # If the backend is Oracle, test that we can pass cursor variables - # as query parameters. - from django.db.backends.oracle.base import Database - - with connection.cursor() as cursor: - var = cursor.var(Database.STRING) - cursor.execute("BEGIN %s := 'X'; END; ", [var]) - self.assertEqual(var.getvalue(), 'X') - - def test_long_string(self): - # If the backend is Oracle, test that we can save a text longer - # than 4000 chars and read it properly - with connection.cursor() as cursor: - cursor.execute('CREATE TABLE ltext ("TEXT" NCLOB)') - long_str = ''.join(str(x) for x in range(4000)) - cursor.execute('INSERT INTO ltext VALUES (%s)', [long_str]) - cursor.execute('SELECT text FROM ltext') - row = cursor.fetchone() - self.assertEqual(long_str, row[0].read()) - cursor.execute('DROP TABLE ltext') - - def test_client_encoding(self): - # If the backend is Oracle, test that the client encoding is set - # correctly. This was broken under Cygwin prior to r14781. - connection.ensure_connection() - self.assertEqual(connection.connection.encoding, "UTF-8") - self.assertEqual(connection.connection.nencoding, "UTF-8") - - def test_order_of_nls_parameters(self): - # an 'almost right' datetime should work with configured - # NLS parameters as per #18465. - with connection.cursor() as cursor: - query = "select 1 from dual where '1936-12-29 00:00' < sysdate" - # The query succeeds without errors - pre #18465 this - # wasn't the case. - cursor.execute(query) - self.assertEqual(cursor.fetchone()[0], 1) - - -@unittest.skipUnless(connection.vendor == 'sqlite', "Test only for SQLite") -class SQLiteTests(TestCase): - - longMessage = True - - def test_autoincrement(self): - """ - auto_increment fields are created with the AUTOINCREMENT keyword - in order to be monotonically increasing. Refs #10164. - """ - with connection.schema_editor(collect_sql=True) as editor: - editor.create_model(Square) - statements = editor.collected_sql - match = re.search('"id" ([^,]+),', statements[0]) - self.assertIsNotNone(match) - self.assertEqual( - 'integer NOT NULL PRIMARY KEY AUTOINCREMENT', - match.group(1), - "Wrong SQL used to create an auto-increment column on SQLite" - ) - - def test_aggregation(self): - """ - #19360: Raise NotImplementedError when aggregating on date/time fields. - """ - for aggregate in (Sum, Avg, Variance, StdDev): - with self.assertRaises(NotImplementedError): - Item.objects.all().aggregate(aggregate('time')) - with self.assertRaises(NotImplementedError): - Item.objects.all().aggregate(aggregate('date')) - with self.assertRaises(NotImplementedError): - Item.objects.all().aggregate(aggregate('last_modified')) - with self.assertRaises(NotImplementedError): - Item.objects.all().aggregate( - **{'complex': aggregate('last_modified') + aggregate('last_modified')} - ) - - def test_memory_db_test_name(self): - """ - A named in-memory db should be allowed where supported. - """ - from django.db.backends.sqlite3.base import DatabaseWrapper - settings_dict = { - 'TEST': { - 'NAME': 'file:memorydb_test?mode=memory&cache=shared', - } - } - wrapper = DatabaseWrapper(settings_dict) - creation = wrapper.creation - if creation.connection.features.can_share_in_memory_db: - expected = creation.connection.settings_dict['TEST']['NAME'] - self.assertEqual(creation._get_test_db_name(), expected) - else: - msg = ( - "Using a shared memory database with `mode=memory` in the " - "database name is not supported in your environment, " - "use `:memory:` instead." - ) - with self.assertRaisesMessage(ImproperlyConfigured, msg): - creation._get_test_db_name() - - -@unittest.skipUnless(connection.vendor == 'postgresql', "Test only for PostgreSQL") -class PostgreSQLTests(TestCase): - - def test_nodb_connection(self): - """ - The _nodb_connection property fallbacks to the default connection - database when access to the 'postgres' database is not granted. - """ - def mocked_connect(self): - if self.settings_dict['NAME'] is None: - raise DatabaseError() - return '' - - nodb_conn = connection._nodb_connection - self.assertIsNone(nodb_conn.settings_dict['NAME']) - - # Now assume the 'postgres' db isn't available - with warnings.catch_warnings(record=True) as w: - with mock.patch('django.db.backends.base.base.BaseDatabaseWrapper.connect', - side_effect=mocked_connect, autospec=True): - warnings.simplefilter('always', RuntimeWarning) - nodb_conn = connection._nodb_connection - self.assertIsNotNone(nodb_conn.settings_dict['NAME']) - self.assertEqual(nodb_conn.settings_dict['NAME'], connection.settings_dict['NAME']) - # Check a RuntimeWarning has been emitted - self.assertEqual(len(w), 1) - self.assertEqual(w[0].message.__class__, RuntimeWarning) - - def test_connect_and_rollback(self): - """ - PostgreSQL shouldn't roll back SET TIME ZONE, even if the first - transaction is rolled back (#17062). - """ - new_connection = connection.copy() - - try: - # Ensure the database default time zone is different than - # the time zone in new_connection.settings_dict. We can - # get the default time zone by reset & show. - cursor = new_connection.cursor() - cursor.execute("RESET TIMEZONE") - cursor.execute("SHOW TIMEZONE") - db_default_tz = cursor.fetchone()[0] - new_tz = 'Europe/Paris' if db_default_tz == 'UTC' else 'UTC' - new_connection.close() - - # Invalidate timezone name cache, because the setting_changed - # handler cannot know about new_connection. - del new_connection.timezone_name - - # Fetch a new connection with the new_tz as default - # time zone, run a query and rollback. - with self.settings(TIME_ZONE=new_tz): - new_connection.set_autocommit(False) - cursor = new_connection.cursor() - new_connection.rollback() - - # Now let's see if the rollback rolled back the SET TIME ZONE. - cursor.execute("SHOW TIMEZONE") - tz = cursor.fetchone()[0] - self.assertEqual(new_tz, tz) - - finally: - new_connection.close() - - def test_connect_non_autocommit(self): - """ - The connection wrapper shouldn't believe that autocommit is enabled - after setting the time zone when AUTOCOMMIT is False (#21452). - """ - new_connection = connection.copy() - new_connection.settings_dict['AUTOCOMMIT'] = False - - try: - # Open a database connection. - new_connection.cursor() - self.assertFalse(new_connection.get_autocommit()) - finally: - new_connection.close() - - def test_connect_isolation_level(self): - """ - Regression test for #18130 and #24318. - """ - import psycopg2 - from psycopg2.extensions import ( - ISOLATION_LEVEL_READ_COMMITTED as read_committed, - ISOLATION_LEVEL_SERIALIZABLE as serializable, - ) - - # Since this is a django.test.TestCase, a transaction is in progress - # and the isolation level isn't reported as 0. This test assumes that - # PostgreSQL is configured with the default isolation level. - - # Check the level on the psycopg2 connection, not the Django wrapper. - default_level = read_committed if psycopg2.__version__ < '2.7' else None - self.assertEqual(connection.connection.isolation_level, default_level) - - new_connection = connection.copy() - new_connection.settings_dict['OPTIONS']['isolation_level'] = serializable - try: - # Start a transaction so the isolation level isn't reported as 0. - new_connection.set_autocommit(False) - # Check the level on the psycopg2 connection, not the Django wrapper. - self.assertEqual(new_connection.connection.isolation_level, serializable) - finally: - new_connection.close() - - def _select(self, val): - with connection.cursor() as cursor: - cursor.execute("SELECT %s", (val,)) - return cursor.fetchone()[0] - - def test_select_ascii_array(self): - a = ["awef"] - b = self._select(a) - self.assertEqual(a[0], b[0]) - - def test_select_unicode_array(self): - a = ["ᄲawef"] - b = self._select(a) - self.assertEqual(a[0], b[0]) - - def test_lookup_cast(self): - from django.db.backends.postgresql.operations import DatabaseOperations - - do = DatabaseOperations(connection=None) - for lookup in ('iexact', 'contains', 'icontains', 'startswith', - 'istartswith', 'endswith', 'iendswith', 'regex', 'iregex'): - self.assertIn('::text', do.lookup_cast(lookup)) - - def test_correct_extraction_psycopg2_version(self): - from django.db.backends.postgresql.base import psycopg2_version - - with mock.patch('psycopg2.__version__', '4.2.1 (dt dec pq3 ext lo64)'): - self.assertEqual(psycopg2_version(), (4, 2, 1)) - - with mock.patch('psycopg2.__version__', '4.2b0.dev1 (dt dec pq3 ext lo64)'): - self.assertEqual(psycopg2_version(), (4, 2)) - - class DateQuotingTest(TestCase): def test_django_date_trunc(self): @@ -379,38 +73,6 @@ class LastExecutedQueryTest(TestCase): last_sql = cursor.db.ops.last_executed_query(cursor, sql, params) self.assertIsInstance(last_sql, str) - @unittest.skipUnless(connection.vendor == 'sqlite', - "This test is specific to SQLite.") - def test_no_interpolation_on_sqlite(self): - # This shouldn't raise an exception (##17158) - query = "SELECT strftime('%Y', 'now');" - connection.cursor().execute(query) - self.assertEqual(connection.queries[-1]['sql'], query) - - @unittest.skipUnless(connection.vendor == 'sqlite', - "This test is specific to SQLite.") - def test_parameter_quoting_on_sqlite(self): - # The implementation of last_executed_queries isn't optimal. It's - # worth testing that parameters are quoted. See #14091. - query = "SELECT %s" - params = ["\"'\\"] - connection.cursor().execute(query, params) - # Note that the single quote is repeated - substituted = "SELECT '\"''\\'" - self.assertEqual(connection.queries[-1]['sql'], substituted) - - @unittest.skipUnless(connection.vendor == 'sqlite', - "This test is specific to SQLite.") - def test_large_number_of_parameters_on_sqlite(self): - # If SQLITE_MAX_VARIABLE_NUMBER (default = 999) has been changed to be - # greater than SQLITE_MAX_COLUMN (default = 2000), last_executed_query - # can hit the SQLITE_MAX_COLUMN limit. See #26063. - cursor = connection.cursor() - sql = "SELECT MAX(%s)" % ", ".join(["%s"] * 2001) - params = list(range(2001)) - # This should not raise an exception. - cursor.db.ops.last_executed_query(cursor.cursor, sql, params) - class ParameterHandlingTest(TestCase): @@ -539,16 +201,6 @@ class EscapingChecks(TestCase): cursor.execute("SELECT '%%', %s" + self.bare_select_suffix, ('%d',)) self.assertEqual(cursor.fetchall()[0], ('%', '%d')) - @unittest.skipUnless(connection.vendor == 'sqlite', - "This is an sqlite-specific issue") - def test_sqlite_parameter_escaping(self): - # '%s' escaping support for sqlite3 #13648 - cursor = connection.cursor() - cursor.execute("select strftime('%s', date('now'))") - response = cursor.fetchall()[0][0] - # response should be an non-zero integer - self.assertTrue(int(response)) - @override_settings(DEBUG=True) class EscapingChecksDebug(EscapingChecks): @@ -1128,76 +780,3 @@ class DBConstraintTestCase(TestCase): intermediary_model.objects.create(from_object_id=obj.id, to_object_id=12345) self.assertEqual(obj.related_objects.count(), 1) self.assertEqual(intermediary_model.objects.count(), 2) - - -class BackendUtilTests(SimpleTestCase): - - def test_format_number(self): - """ - Test the format_number converter utility - """ - def equal(value, max_d, places, result): - self.assertEqual(format_number(Decimal(value), max_d, places), result) - - equal('0', 12, 3, - '0.000') - equal('0', 12, 8, - '0.00000000') - equal('1', 12, 9, - '1.000000000') - equal('0.00000000', 12, 8, - '0.00000000') - equal('0.000000004', 12, 8, - '0.00000000') - equal('0.000000008', 12, 8, - '0.00000001') - equal('0.000000000000000000999', 10, 8, - '0.00000000') - equal('0.1234567890', 12, 10, - '0.1234567890') - equal('0.1234567890', 12, 9, - '0.123456789') - equal('0.1234567890', 12, 8, - '0.12345679') - equal('0.1234567890', 12, 5, - '0.12346') - equal('0.1234567890', 12, 3, - '0.123') - equal('0.1234567890', 12, 1, - '0.1') - equal('0.1234567890', 12, 0, - '0') - equal('0.1234567890', None, 0, - '0') - equal('1234567890.1234567890', None, 0, - '1234567890') - equal('1234567890.1234567890', None, 2, - '1234567890.12') - equal('0.1234', 5, None, - '0.1234') - equal('123.12', 5, None, - '123.12') - with self.assertRaises(Rounded): - equal('0.1234567890', 5, None, - '0.12346') - with self.assertRaises(Rounded): - equal('1234567890.1234', 5, None, - '1234600000') - - -@unittest.skipUnless(connection.vendor == 'sqlite', 'SQLite specific test.') -@skipUnlessDBFeature('can_share_in_memory_db') -class TestSqliteThreadSharing(TransactionTestCase): - available_apps = ['backends'] - - def test_database_sharing_in_threads(self): - def create_object(): - Object.objects.create() - - create_object() - - thread = threading.Thread(target=create_object) - thread.start() - thread.join() - - self.assertEqual(Object.objects.count(), 2) diff --git a/tests/db_utils/tests.py b/tests/db_utils/tests.py new file mode 100644 index 0000000000..2a45342df5 --- /dev/null +++ b/tests/db_utils/tests.py @@ -0,0 +1,49 @@ +"""Tests for django.db.utils.""" +import unittest + +from django.core.exceptions import ImproperlyConfigured +from django.db import DEFAULT_DB_ALIAS, connection +from django.db.utils import ConnectionHandler, ProgrammingError, load_backend +from django.test import SimpleTestCase, TestCase + + +class ConnectionHandlerTests(SimpleTestCase): + + def test_connection_handler_no_databases(self): + """Empty DATABASES setting defaults to the dummy backend.""" + DATABASES = {} + conns = ConnectionHandler(DATABASES) + self.assertEqual(conns[DEFAULT_DB_ALIAS].settings_dict['ENGINE'], 'django.db.backends.dummy') + msg = ( + 'settings.DATABASES is improperly configured. Please supply the ' + 'ENGINE value. Check settings documentation for more details.' + ) + with self.assertRaisesMessage(ImproperlyConfigured, msg): + conns[DEFAULT_DB_ALIAS].ensure_connection() + + +class DatabaseErrorWrapperTests(TestCase): + + @unittest.skipUnless(connection.vendor == 'postgresql', 'PostgreSQL test') + def test_reraising_backend_specific_database_exception(self): + cursor = connection.cursor() + msg = 'table "X" does not exist' + with self.assertRaisesMessage(ProgrammingError, msg) as cm: + cursor.execute('DROP TABLE "X"') + self.assertNotEqual(type(cm.exception), type(cm.exception.__cause__)) + self.assertIsNotNone(cm.exception.__cause__) + self.assertIsNotNone(cm.exception.__cause__.pgcode) + self.assertIsNotNone(cm.exception.__cause__.pgerror) + + +class LoadBackendTests(SimpleTestCase): + + def test_load_backend_invalid_name(self): + msg = ( + "'foo' isn't an available database backend.\n" + "Try using 'django.db.backends.XXX', where XXX is one of:\n" + " 'mysql', 'oracle', 'postgresql', 'sqlite3'" + ) + with self.assertRaisesMessage(ImproperlyConfigured, msg) as cm: + load_backend('foo') + self.assertEqual(str(cm.exception.__cause__), "No module named 'foo'")