Reorganized backends tests.

This commit is contained in:
Mariusz Felisiak 2017-02-11 21:37:49 +01:00 committed by Tim Graham
parent 0f91ba1adc
commit 8cb1b1fd8e
21 changed files with 731 additions and 710 deletions

View File

View File

@ -0,0 +1,32 @@
from django.db import DEFAULT_DB_ALIAS, connection, connections
from django.db.backends.base.base import BaseDatabaseWrapper
from django.test import SimpleTestCase
class DatabaseWrapperTests(SimpleTestCase):
def test_initialization_class_attributes(self):
"""
The "initialization" class attributes like client_class and
creation_class should be set on the class and reflected in the
corresponding instance attributes of the instantiated backend.
"""
conn = connections[DEFAULT_DB_ALIAS]
conn_class = type(conn)
attr_names = [
('client_class', 'client'),
('creation_class', 'creation'),
('features_class', 'features'),
('introspection_class', 'introspection'),
('ops_class', 'ops'),
('validation_class', 'validation'),
]
for class_attr_name, instance_attr_name in attr_names:
class_attr_value = getattr(conn_class, class_attr_name)
self.assertIsNotNone(class_attr_value)
instance_attr_value = getattr(conn, instance_attr_name)
self.assertIsInstance(instance_attr_value, class_attr_value)
def test_initialization_display_name(self):
self.assertEqual(BaseDatabaseWrapper.display_name, 'unknown')
self.assertNotEqual(connection.display_name, 'unknown')

View File

@ -0,0 +1,42 @@
import copy
from django.db import DEFAULT_DB_ALIAS, connections
from django.db.backends.base.creation import (
TEST_DATABASE_PREFIX, BaseDatabaseCreation,
)
from django.test import SimpleTestCase
class TestDbSignatureTests(SimpleTestCase):
def get_connection_copy(self):
# Get a copy of the default connection. (Can't use django.db.connection
# because it'll modify the default connection itself.)
test_connection = copy.copy(connections[DEFAULT_DB_ALIAS])
test_connection.settings_dict = copy.copy(connections[DEFAULT_DB_ALIAS].settings_dict)
return test_connection
def test_default_name(self):
# A test db name isn't set.
prod_name = 'hodor'
test_connection = self.get_connection_copy()
test_connection.settings_dict['NAME'] = prod_name
test_connection.settings_dict['TEST'] = {'NAME': None}
signature = BaseDatabaseCreation(test_connection).test_db_signature()
self.assertEqual(signature[3], TEST_DATABASE_PREFIX + prod_name)
def test_custom_test_name(self):
# A regular test db name is set.
test_name = 'hodor'
test_connection = self.get_connection_copy()
test_connection.settings_dict['TEST'] = {'NAME': test_name}
signature = BaseDatabaseCreation(test_connection).test_db_signature()
self.assertEqual(signature[3], test_name)
def test_custom_test_name_with_test_prefix(self):
# A test db name prefixed with TEST_DATABASE_PREFIX is set.
test_name = TEST_DATABASE_PREFIX + 'hodor'
test_connection = self.get_connection_copy()
test_connection.settings_dict['TEST'] = {'NAME': test_name}
signature = BaseDatabaseCreation(test_connection).test_db_signature()
self.assertEqual(signature[3], test_name)

View File

@ -0,0 +1,8 @@
from django.db import connection
from django.test import TestCase
class TestDatabaseFeatures(TestCase):
def test_nonexistent_feature(self):
self.assertFalse(hasattr(connection.features, 'nonexistent'))

View File

View File

@ -0,0 +1,45 @@
import unittest
from io import StringIO
from unittest import mock
from django.db import connection
from django.db.backends.base.creation import BaseDatabaseCreation
from django.db.backends.mysql.creation import DatabaseCreation
from django.db.utils import DatabaseError
from django.test import SimpleTestCase
@unittest.skipUnless(connection.vendor == 'mysql', 'MySQL tests')
class DatabaseCreationTests(SimpleTestCase):
def _execute_raise_database_exists(self, cursor, parameters, keepdb=False):
raise DatabaseError(1007, "Can't create database '%s'; database exists" % parameters['dbname'])
def _execute_raise_access_denied(self, cursor, parameters, keepdb=False):
raise DatabaseError(1044, "Access denied for user")
def patch_test_db_creation(self, execute_create_test_db):
return mock.patch.object(BaseDatabaseCreation, '_execute_create_test_db', execute_create_test_db)
@mock.patch('sys.stdout', new_callable=StringIO)
@mock.patch('sys.stderr', new_callable=StringIO)
def test_create_test_db_database_exists(self, *mocked_objects):
# Simulate test database creation raising "database exists"
creation = DatabaseCreation(connection)
with self.patch_test_db_creation(self._execute_raise_database_exists):
with mock.patch('builtins.input', return_value='no'):
with self.assertRaises(SystemExit):
# SystemExit is raised if the user answers "no" to the
# prompt asking if it's okay to delete the test database.
creation._create_test_db(verbosity=0, autoclobber=False, keepdb=False)
# "Database exists" shouldn't appear when keepdb is on
creation._create_test_db(verbosity=0, autoclobber=False, keepdb=True)
@mock.patch('sys.stdout', new_callable=StringIO)
@mock.patch('sys.stderr', new_callable=StringIO)
def test_create_test_db_unexpected_error(self, *mocked_objects):
# Simulate test database creation raising unexpected error
creation = DatabaseCreation(connection)
with self.patch_test_db_creation(self._execute_raise_access_denied):
with self.assertRaises(SystemExit):
creation._create_test_db(verbosity=0, autoclobber=False, keepdb=False)

View File

@ -4,16 +4,10 @@ from django.db import connection
from django.test import TestCase
class TestDatabaseFeatures(TestCase):
@skipUnless(connection.vendor == 'mysql', 'MySQL tests')
class TestFeatures(TestCase):
def test_nonexistent_feature(self):
self.assertFalse(hasattr(connection.features, 'nonexistent'))
@skipUnless(connection.vendor == 'mysql', 'MySQL backend tests')
class TestMySQLFeatures(TestCase):
def test_mysql_supports_transactions(self):
def test_supports_transactions(self):
"""
All storage engines except MyISAM support transactions.
"""

View File

@ -14,8 +14,8 @@ def get_connection():
@override_settings(DEBUG=True)
@unittest.skipUnless(connection.vendor == 'mysql', 'MySQL specific test.')
class MySQLTests(TestCase):
@unittest.skipUnless(connection.vendor == 'mysql', 'MySQL tests')
class IsolationLevelTests(TestCase):
read_committed = 'read committed'
repeatable_read = 'repeatable read'

View File

View File

@ -0,0 +1,76 @@
import unittest
from io import StringIO
from unittest import mock
from django.db import connection
from django.db.backends.oracle.creation import DatabaseCreation
from django.db.utils import DatabaseError
from django.test import TestCase
@unittest.skipUnless(connection.vendor == 'oracle', 'Oracle tests')
@mock.patch.object(DatabaseCreation, '_maindb_connection', return_value=connection)
@mock.patch('sys.stdout', new_callable=StringIO)
@mock.patch('sys.stderr', new_callable=StringIO)
class DatabaseCreationTests(TestCase):
def _execute_raise_user_already_exists(self, cursor, statements, parameters, verbosity, allow_quiet_fail=False):
# Raise "user already exists" only in test user creation
if statements and statements[0].startswith('CREATE USER'):
raise DatabaseError("ORA-01920: user name 'string' conflicts with another user or role name")
def _execute_raise_tablespace_already_exists(
self, cursor, statements, parameters, verbosity, allow_quiet_fail=False
):
raise DatabaseError("ORA-01543: tablespace 'string' already exists")
def _execute_raise_insufficient_privileges(
self, cursor, statements, parameters, verbosity, allow_quiet_fail=False
):
raise DatabaseError("ORA-01031: insufficient privileges")
def _test_database_passwd(self):
# Mocked to avoid test user password changed
return connection.settings_dict['SAVED_PASSWORD']
def patch_execute_statements(self, execute_statements):
return mock.patch.object(DatabaseCreation, '_execute_statements', execute_statements)
@mock.patch.object(DatabaseCreation, '_test_user_create', return_value=False)
def test_create_test_db(self, *mocked_objects):
creation = DatabaseCreation(connection)
# Simulate test database creation raising "tablespace already exists"
with self.patch_execute_statements(self._execute_raise_tablespace_already_exists):
with mock.patch('builtins.input', return_value='no'):
with self.assertRaises(SystemExit):
# SystemExit is raised if the user answers "no" to the
# prompt asking if it's okay to delete the test tablespace.
creation._create_test_db(verbosity=0, keepdb=False)
# "Tablespace already exists" error is ignored when keepdb is on
creation._create_test_db(verbosity=0, keepdb=True)
# Simulate test database creation raising unexpected error
with self.patch_execute_statements(self._execute_raise_insufficient_privileges):
with self.assertRaises(SystemExit):
creation._create_test_db(verbosity=0, keepdb=False)
with self.assertRaises(SystemExit):
creation._create_test_db(verbosity=0, keepdb=True)
@mock.patch.object(DatabaseCreation, '_test_database_create', return_value=False)
def test_create_test_user(self, *mocked_objects):
creation = DatabaseCreation(connection)
with mock.patch.object(DatabaseCreation, '_test_database_passwd', self._test_database_passwd):
# Simulate test user creation raising "user already exists"
with self.patch_execute_statements(self._execute_raise_user_already_exists):
with mock.patch('builtins.input', return_value='no'):
with self.assertRaises(SystemExit):
# SystemExit is raised if the user answers "no" to the
# prompt asking if it's okay to delete the test user.
creation._create_test_db(verbosity=0, keepdb=False)
# "User already exists" error is ignored when keepdb is on
creation._create_test_db(verbosity=0, keepdb=True)
# Simulate test user creation raising unexpected error
with self.patch_execute_statements(self._execute_raise_insufficient_privileges):
with self.assertRaises(SystemExit):
creation._create_test_db(verbosity=0, keepdb=False)
with self.assertRaises(SystemExit):
creation._create_test_db(verbosity=0, keepdb=True)

View File

@ -0,0 +1,55 @@
import unittest
from django.db import connection
@unittest.skipUnless(connection.vendor == 'oracle', 'Oracle tests')
class Tests(unittest.TestCase):
def test_quote_name(self):
"""'%' chars are escaped for query execution."""
name = '"SOME%NAME"'
quoted_name = connection.ops.quote_name(name)
self.assertEqual(quoted_name % (), name)
def test_dbms_session(self):
"""A stored procedure can be called through a cursor wrapper."""
with connection.cursor() as cursor:
cursor.callproc('DBMS_SESSION.SET_IDENTIFIER', ['_django_testing!'])
def test_cursor_var(self):
"""Cursor variables can be passed as query parameters."""
from django.db.backends.oracle.base import Database
with connection.cursor() as cursor:
var = cursor.var(Database.STRING)
cursor.execute("BEGIN %s := 'X'; END; ", [var])
self.assertEqual(var.getvalue(), 'X')
def test_long_string(self):
"""Text longer than 4000 chars can be saved and read."""
with connection.cursor() as cursor:
cursor.execute('CREATE TABLE ltext ("TEXT" NCLOB)')
long_str = ''.join(str(x) for x in range(4000))
cursor.execute('INSERT INTO ltext VALUES (%s)', [long_str])
cursor.execute('SELECT text FROM ltext')
row = cursor.fetchone()
self.assertEqual(long_str, row[0].read())
cursor.execute('DROP TABLE ltext')
def test_client_encoding(self):
"""Client encoding is set correctly."""
connection.ensure_connection()
self.assertEqual(connection.connection.encoding, 'UTF-8')
self.assertEqual(connection.connection.nencoding, 'UTF-8')
def test_order_of_nls_parameters(self):
"""
An 'almost right' datetime works with configured NLS parameters
(#18465).
"""
with connection.cursor() as cursor:
query = "select 1 from dual where '1936-12-29 00:00' < sysdate"
# The query succeeds without errors - pre #18465 this
# wasn't the case.
cursor.execute(query)
self.assertEqual(cursor.fetchone()[0], 1)

View File

View File

@ -0,0 +1,95 @@
import unittest
from contextlib import contextmanager
from io import StringIO
from unittest import mock
from django.db import connection
from django.db.backends.base.creation import BaseDatabaseCreation
from django.db.utils import DatabaseError
from django.test import SimpleTestCase
try:
import psycopg2 # NOQA
except ImportError:
pass
else:
from psycopg2 import errorcodes
from django.db.backends.postgresql.creation import DatabaseCreation
@unittest.skipUnless(connection.vendor == 'postgresql', 'PostgreSQL tests')
class DatabaseCreationTests(SimpleTestCase):
@contextmanager
def changed_test_settings(self, **kwargs):
settings = connection.settings_dict['TEST']
saved_values = {}
for name in kwargs:
if name in settings:
saved_values[name] = settings[name]
for name, value in kwargs.items():
settings[name] = value
try:
yield
finally:
for name, value in kwargs.items():
if name in saved_values:
settings[name] = saved_values[name]
else:
del settings[name]
def check_sql_table_creation_suffix(self, settings, expected):
with self.changed_test_settings(**settings):
creation = DatabaseCreation(connection)
suffix = creation.sql_table_creation_suffix()
self.assertEqual(suffix, expected)
def test_sql_table_creation_suffix_with_none_settings(self):
settings = {'CHARSET': None, 'TEMPLATE': None}
self.check_sql_table_creation_suffix(settings, "")
def test_sql_table_creation_suffix_with_encoding(self):
settings = {'CHARSET': 'UTF8'}
self.check_sql_table_creation_suffix(settings, "WITH ENCODING 'UTF8'")
def test_sql_table_creation_suffix_with_template(self):
settings = {'TEMPLATE': 'template0'}
self.check_sql_table_creation_suffix(settings, 'WITH TEMPLATE "template0"')
def test_sql_table_creation_suffix_with_encoding_and_template(self):
settings = {'CHARSET': 'UTF8', 'TEMPLATE': 'template0'}
self.check_sql_table_creation_suffix(settings, '''WITH ENCODING 'UTF8' TEMPLATE "template0"''')
def _execute_raise_database_already_exists(self, cursor, parameters, keepdb=False):
error = DatabaseError('database %s already exists' % parameters['dbname'])
error.pgcode = errorcodes.DUPLICATE_DATABASE
raise DatabaseError() from error
def _execute_raise_permission_denied(self, cursor, parameters, keepdb=False):
error = DatabaseError('permission denied to create database')
error.pgcode = errorcodes.INSUFFICIENT_PRIVILEGE
raise DatabaseError() from error
def patch_test_db_creation(self, execute_create_test_db):
return mock.patch.object(BaseDatabaseCreation, '_execute_create_test_db', execute_create_test_db)
@mock.patch('sys.stdout', new_callable=StringIO)
@mock.patch('sys.stderr', new_callable=StringIO)
def test_create_test_db(self, *mocked_objects):
creation = DatabaseCreation(connection)
# Simulate test database creation raising "database already exists"
with self.patch_test_db_creation(self._execute_raise_database_already_exists):
with mock.patch('builtins.input', return_value='no'):
with self.assertRaises(SystemExit):
# SystemExit is raised if the user answers "no" to the
# prompt asking if it's okay to delete the test database.
creation._create_test_db(verbosity=0, autoclobber=False, keepdb=False)
# "Database already exists" error is ignored when keepdb is on
creation._create_test_db(verbosity=0, autoclobber=False, keepdb=True)
# Simulate test database creation raising unexpected error
with self.patch_test_db_creation(self._execute_raise_permission_denied):
with self.assertRaises(SystemExit):
creation._create_test_db(verbosity=0, autoclobber=False, keepdb=False)
with self.assertRaises(SystemExit):
creation._create_test_db(verbosity=0, autoclobber=False, keepdb=True)

View File

@ -6,10 +6,10 @@ from contextlib import contextmanager
from django.db import connection
from django.test import TestCase
from .models import Person
from ..models import Person
@unittest.skipUnless(connection.vendor == 'postgresql', "Test only for PostgreSQL")
@unittest.skipUnless(connection.vendor == 'postgresql', 'PostgreSQL tests')
class ServerSideCursorsPostgres(TestCase):
cursor_fields = 'name, statement, is_holdable, is_binary, is_scrollable, creation_time'
PostgresCursor = namedtuple('PostgresCursor', cursor_fields)

View File

@ -0,0 +1,147 @@
import unittest
import warnings
from unittest import mock
from django.db import DatabaseError, connection
from django.test import TestCase
@unittest.skipUnless(connection.vendor == 'postgresql', 'PostgreSQL tests')
class Tests(TestCase):
def test_nodb_connection(self):
"""
The _nodb_connection property fallbacks to the default connection
database when access to the 'postgres' database is not granted.
"""
def mocked_connect(self):
if self.settings_dict['NAME'] is None:
raise DatabaseError()
return ''
nodb_conn = connection._nodb_connection
self.assertIsNone(nodb_conn.settings_dict['NAME'])
# Now assume the 'postgres' db isn't available
with warnings.catch_warnings(record=True) as w:
with mock.patch('django.db.backends.base.base.BaseDatabaseWrapper.connect',
side_effect=mocked_connect, autospec=True):
warnings.simplefilter('always', RuntimeWarning)
nodb_conn = connection._nodb_connection
self.assertIsNotNone(nodb_conn.settings_dict['NAME'])
self.assertEqual(nodb_conn.settings_dict['NAME'], connection.settings_dict['NAME'])
# Check a RuntimeWarning has been emitted
self.assertEqual(len(w), 1)
self.assertEqual(w[0].message.__class__, RuntimeWarning)
def test_connect_and_rollback(self):
"""
PostgreSQL shouldn't roll back SET TIME ZONE, even if the first
transaction is rolled back (#17062).
"""
new_connection = connection.copy()
try:
# Ensure the database default time zone is different than
# the time zone in new_connection.settings_dict. We can
# get the default time zone by reset & show.
cursor = new_connection.cursor()
cursor.execute("RESET TIMEZONE")
cursor.execute("SHOW TIMEZONE")
db_default_tz = cursor.fetchone()[0]
new_tz = 'Europe/Paris' if db_default_tz == 'UTC' else 'UTC'
new_connection.close()
# Invalidate timezone name cache, because the setting_changed
# handler cannot know about new_connection.
del new_connection.timezone_name
# Fetch a new connection with the new_tz as default
# time zone, run a query and rollback.
with self.settings(TIME_ZONE=new_tz):
new_connection.set_autocommit(False)
cursor = new_connection.cursor()
new_connection.rollback()
# Now let's see if the rollback rolled back the SET TIME ZONE.
cursor.execute("SHOW TIMEZONE")
tz = cursor.fetchone()[0]
self.assertEqual(new_tz, tz)
finally:
new_connection.close()
def test_connect_non_autocommit(self):
"""
The connection wrapper shouldn't believe that autocommit is enabled
after setting the time zone when AUTOCOMMIT is False (#21452).
"""
new_connection = connection.copy()
new_connection.settings_dict['AUTOCOMMIT'] = False
try:
# Open a database connection.
new_connection.cursor()
self.assertFalse(new_connection.get_autocommit())
finally:
new_connection.close()
def test_connect_isolation_level(self):
"""
The transaction level can be configured with
DATABASES ['OPTIONS']['isolation_level'].
"""
import psycopg2
from psycopg2.extensions import (
ISOLATION_LEVEL_READ_COMMITTED as read_committed,
ISOLATION_LEVEL_SERIALIZABLE as serializable,
)
# Since this is a django.test.TestCase, a transaction is in progress
# and the isolation level isn't reported as 0. This test assumes that
# PostgreSQL is configured with the default isolation level.
# Check the level on the psycopg2 connection, not the Django wrapper.
default_level = read_committed if psycopg2.__version__ < '2.7' else None
self.assertEqual(connection.connection.isolation_level, default_level)
new_connection = connection.copy()
new_connection.settings_dict['OPTIONS']['isolation_level'] = serializable
try:
# Start a transaction so the isolation level isn't reported as 0.
new_connection.set_autocommit(False)
# Check the level on the psycopg2 connection, not the Django wrapper.
self.assertEqual(new_connection.connection.isolation_level, serializable)
finally:
new_connection.close()
def _select(self, val):
with connection.cursor() as cursor:
cursor.execute('SELECT %s', (val,))
return cursor.fetchone()[0]
def test_select_ascii_array(self):
a = ['awef']
b = self._select(a)
self.assertEqual(a[0], b[0])
def test_select_unicode_array(self):
a = ['ᄲawef']
b = self._select(a)
self.assertEqual(a[0], b[0])
def test_lookup_cast(self):
from django.db.backends.postgresql.operations import DatabaseOperations
do = DatabaseOperations(connection=None)
lookups = (
'iexact', 'contains', 'icontains', 'startswith', 'istartswith',
'endswith', 'iendswith', 'regex', 'iregex',
)
for lookup in lookups:
with self.subTest(lookup=lookup):
self.assertIn('::text', do.lookup_cast(lookup))
def test_correct_extraction_psycopg2_version(self):
from django.db.backends.postgresql.base import psycopg2_version
with mock.patch('psycopg2.__version__', '4.2.1 (dt dec pq3 ext lo64)'):
self.assertEqual(psycopg2_version(), (4, 2, 1))
with mock.patch('psycopg2.__version__', '4.2b0.dev1 (dt dec pq3 ext lo64)'):
self.assertEqual(psycopg2_version(), (4, 2))

View File

View File

@ -0,0 +1,138 @@
import re
import threading
import unittest
from django.core.exceptions import ImproperlyConfigured
from django.db import connection
from django.db.models import Avg, StdDev, Sum, Variance
from django.test import (
TestCase, TransactionTestCase, override_settings, skipUnlessDBFeature,
)
from ..models import Item, Object, Square
@unittest.skipUnless(connection.vendor == 'sqlite', 'SQLite tests')
class Tests(TestCase):
longMessage = True
def test_autoincrement(self):
"""
auto_increment fields are created with the AUTOINCREMENT keyword
in order to be monotonically increasing (#10164).
"""
with connection.schema_editor(collect_sql=True) as editor:
editor.create_model(Square)
statements = editor.collected_sql
match = re.search('"id" ([^,]+),', statements[0])
self.assertIsNotNone(match)
self.assertEqual(
'integer NOT NULL PRIMARY KEY AUTOINCREMENT',
match.group(1),
'Wrong SQL used to create an auto-increment column on SQLite'
)
def test_aggregation(self):
"""
Raise NotImplementedError when aggregating on date/time fields (#19360).
"""
for aggregate in (Sum, Avg, Variance, StdDev):
with self.assertRaises(NotImplementedError):
Item.objects.all().aggregate(aggregate('time'))
with self.assertRaises(NotImplementedError):
Item.objects.all().aggregate(aggregate('date'))
with self.assertRaises(NotImplementedError):
Item.objects.all().aggregate(aggregate('last_modified'))
with self.assertRaises(NotImplementedError):
Item.objects.all().aggregate(
**{'complex': aggregate('last_modified') + aggregate('last_modified')}
)
def test_memory_db_test_name(self):
"""A named in-memory db should be allowed where supported."""
from django.db.backends.sqlite3.base import DatabaseWrapper
settings_dict = {
'TEST': {
'NAME': 'file:memorydb_test?mode=memory&cache=shared',
}
}
wrapper = DatabaseWrapper(settings_dict)
creation = wrapper.creation
if creation.connection.features.can_share_in_memory_db:
expected = creation.connection.settings_dict['TEST']['NAME']
self.assertEqual(creation._get_test_db_name(), expected)
else:
msg = (
"Using a shared memory database with `mode=memory` in the "
"database name is not supported in your environment, "
"use `:memory:` instead."
)
with self.assertRaisesMessage(ImproperlyConfigured, msg):
creation._get_test_db_name()
@unittest.skipUnless(connection.vendor == 'sqlite', 'Test only for SQLite')
@override_settings(DEBUG=True)
class LastExecutedQueryTest(TestCase):
def test_no_interpolation(self):
# This shouldn't raise an exception (#17158)
query = "SELECT strftime('%Y', 'now');"
connection.cursor().execute(query)
self.assertEqual(connection.queries[-1]['sql'], query)
def test_parameter_quoting(self):
# The implementation of last_executed_queries isn't optimal. It's
# worth testing that parameters are quoted (#14091).
query = "SELECT %s"
params = ["\"'\\"]
connection.cursor().execute(query, params)
# Note that the single quote is repeated
substituted = "SELECT '\"''\\'"
self.assertEqual(connection.queries[-1]['sql'], substituted)
def test_large_number_of_parameters(self):
# If SQLITE_MAX_VARIABLE_NUMBER (default = 999) has been changed to be
# greater than SQLITE_MAX_COLUMN (default = 2000), last_executed_query
# can hit the SQLITE_MAX_COLUMN limit (#26063).
cursor = connection.cursor()
sql = "SELECT MAX(%s)" % ", ".join(["%s"] * 2001)
params = list(range(2001))
# This should not raise an exception.
cursor.db.ops.last_executed_query(cursor.cursor, sql, params)
@unittest.skipUnless(connection.vendor == 'sqlite', 'SQLite tests')
class EscapingChecks(TestCase):
"""
All tests in this test case are also run with settings.DEBUG=True in
EscapingChecksDebug test case, to also test CursorDebugWrapper.
"""
def test_parameter_escaping(self):
# '%s' escaping support for sqlite3 (#13648).
cursor = connection.cursor()
cursor.execute("select strftime('%s', date('now'))")
response = cursor.fetchall()[0][0]
# response should be an non-zero integer
self.assertTrue(int(response))
@unittest.skipUnless(connection.vendor == 'sqlite', 'SQLite tests')
@override_settings(DEBUG=True)
class EscapingChecksDebug(EscapingChecks):
pass
@unittest.skipUnless(connection.vendor == 'sqlite', 'SQLite tests')
@skipUnlessDBFeature('can_share_in_memory_db')
class ThreadSharing(TransactionTestCase):
available_apps = ['backends']
def test_database_sharing_in_threads(self):
def create_object():
Object.objects.create()
create_object()
thread = threading.Thread(target=create_object)
thread.start()
thread.join()
self.assertEqual(Object.objects.count(), 2)

View File

@ -1,244 +0,0 @@
import copy
import unittest
from contextlib import contextmanager
from io import StringIO
from unittest import mock
from django.db import DEFAULT_DB_ALIAS, connection, connections
from django.db.backends.base.creation import (
TEST_DATABASE_PREFIX, BaseDatabaseCreation,
)
from django.db.backends.mysql.creation import (
DatabaseCreation as MySQLDatabaseCreation,
)
from django.db.backends.oracle.creation import (
DatabaseCreation as OracleDatabaseCreation,
)
from django.db.utils import DatabaseError
from django.test import SimpleTestCase, TestCase
try:
import psycopg2 # NOQA
except ImportError:
pass
else:
from psycopg2 import errorcodes
from django.db.backends.postgresql.creation import \
DatabaseCreation as PostgreSQLDatabaseCreation
class TestDbSignatureTests(SimpleTestCase):
def get_connection_copy(self):
# Get a copy of the default connection. (Can't use django.db.connection
# because it'll modify the default connection itself.)
test_connection = copy.copy(connections[DEFAULT_DB_ALIAS])
test_connection.settings_dict = copy.copy(connections[DEFAULT_DB_ALIAS].settings_dict)
return test_connection
def test_default_name(self):
# A test db name isn't set.
prod_name = 'hodor'
test_connection = self.get_connection_copy()
test_connection.settings_dict['NAME'] = prod_name
test_connection.settings_dict['TEST'] = {'NAME': None}
signature = BaseDatabaseCreation(test_connection).test_db_signature()
self.assertEqual(signature[3], TEST_DATABASE_PREFIX + prod_name)
def test_custom_test_name(self):
# A regular test db name is set.
test_name = 'hodor'
test_connection = self.get_connection_copy()
test_connection.settings_dict['TEST'] = {'NAME': test_name}
signature = BaseDatabaseCreation(test_connection).test_db_signature()
self.assertEqual(signature[3], test_name)
def test_custom_test_name_with_test_prefix(self):
# A test db name prefixed with TEST_DATABASE_PREFIX is set.
test_name = TEST_DATABASE_PREFIX + 'hodor'
test_connection = self.get_connection_copy()
test_connection.settings_dict['TEST'] = {'NAME': test_name}
signature = BaseDatabaseCreation(test_connection).test_db_signature()
self.assertEqual(signature[3], test_name)
@unittest.skipUnless(connection.vendor == 'postgresql', "PostgreSQL-specific tests")
class PostgreSQLDatabaseCreationTests(SimpleTestCase):
@contextmanager
def changed_test_settings(self, **kwargs):
settings = connection.settings_dict['TEST']
saved_values = {}
for name in kwargs:
if name in settings:
saved_values[name] = settings[name]
for name, value in kwargs.items():
settings[name] = value
try:
yield
finally:
for name, value in kwargs.items():
if name in saved_values:
settings[name] = saved_values[name]
else:
del settings[name]
def check_sql_table_creation_suffix(self, settings, expected):
with self.changed_test_settings(**settings):
creation = PostgreSQLDatabaseCreation(connection)
suffix = creation.sql_table_creation_suffix()
self.assertEqual(suffix, expected)
def test_sql_table_creation_suffix_with_none_settings(self):
settings = {'CHARSET': None, 'TEMPLATE': None}
self.check_sql_table_creation_suffix(settings, "")
def test_sql_table_creation_suffix_with_encoding(self):
settings = {'CHARSET': 'UTF8'}
self.check_sql_table_creation_suffix(settings, "WITH ENCODING 'UTF8'")
def test_sql_table_creation_suffix_with_template(self):
settings = {'TEMPLATE': 'template0'}
self.check_sql_table_creation_suffix(settings, 'WITH TEMPLATE "template0"')
def test_sql_table_creation_suffix_with_encoding_and_template(self):
settings = {'CHARSET': 'UTF8', 'TEMPLATE': 'template0'}
self.check_sql_table_creation_suffix(settings, '''WITH ENCODING 'UTF8' TEMPLATE "template0"''')
def _execute_raise_database_already_exists(self, cursor, parameters, keepdb=False):
error = DatabaseError('database %s already exists' % parameters['dbname'])
error.pgcode = errorcodes.DUPLICATE_DATABASE
raise DatabaseError() from error
def _execute_raise_permission_denied(self, cursor, parameters, keepdb=False):
error = DatabaseError('permission denied to create database')
error.pgcode = errorcodes.INSUFFICIENT_PRIVILEGE
raise DatabaseError() from error
def patch_test_db_creation(self, execute_create_test_db):
return mock.patch.object(BaseDatabaseCreation, '_execute_create_test_db', execute_create_test_db)
@mock.patch('sys.stdout', new_callable=StringIO)
@mock.patch('sys.stderr', new_callable=StringIO)
def test_create_test_db(self, *mocked_objects):
creation = PostgreSQLDatabaseCreation(connection)
# Simulate test database creation raising "database already exists"
with self.patch_test_db_creation(self._execute_raise_database_already_exists):
with mock.patch('builtins.input', return_value='no'):
with self.assertRaises(SystemExit):
# SystemExit is raised if the user answers "no" to the
# prompt asking if it's okay to delete the test database.
creation._create_test_db(verbosity=0, autoclobber=False, keepdb=False)
# "Database already exists" error is ignored when keepdb is on
creation._create_test_db(verbosity=0, autoclobber=False, keepdb=True)
# Simulate test database creation raising unexpected error
with self.patch_test_db_creation(self._execute_raise_permission_denied):
with self.assertRaises(SystemExit):
creation._create_test_db(verbosity=0, autoclobber=False, keepdb=False)
with self.assertRaises(SystemExit):
creation._create_test_db(verbosity=0, autoclobber=False, keepdb=True)
@unittest.skipUnless(connection.vendor == 'oracle', "Oracle specific tests")
@mock.patch.object(OracleDatabaseCreation, '_maindb_connection', return_value=connection)
@mock.patch('sys.stdout', new_callable=StringIO)
@mock.patch('sys.stderr', new_callable=StringIO)
class OracleDatabaseCreationTests(TestCase):
def _execute_raise_user_already_exists(self, cursor, statements, parameters, verbosity, allow_quiet_fail=False):
# Raise "user already exists" only in test user creation
if statements and statements[0].startswith('CREATE USER'):
raise DatabaseError("ORA-01920: user name 'string' conflicts with another user or role name")
def _execute_raise_tablespace_already_exists(
self, cursor, statements, parameters, verbosity, allow_quiet_fail=False
):
raise DatabaseError("ORA-01543: tablespace 'string' already exists")
def _execute_raise_insufficient_privileges(
self, cursor, statements, parameters, verbosity, allow_quiet_fail=False
):
raise DatabaseError("ORA-01031: insufficient privileges")
def _test_database_passwd(self):
# Mocked to avoid test user password changed
return connection.settings_dict['SAVED_PASSWORD']
def patch_execute_statements(self, execute_statements):
return mock.patch.object(OracleDatabaseCreation, '_execute_statements', execute_statements)
@mock.patch.object(OracleDatabaseCreation, '_test_user_create', return_value=False)
def test_create_test_db(self, *mocked_objects):
creation = OracleDatabaseCreation(connection)
# Simulate test database creation raising "tablespace already exists"
with self.patch_execute_statements(self._execute_raise_tablespace_already_exists):
with mock.patch('builtins.input', return_value='no'):
with self.assertRaises(SystemExit):
# SystemExit is raised if the user answers "no" to the
# prompt asking if it's okay to delete the test tablespace.
creation._create_test_db(verbosity=0, keepdb=False)
# "Tablespace already exists" error is ignored when keepdb is on
creation._create_test_db(verbosity=0, keepdb=True)
# Simulate test database creation raising unexpected error
with self.patch_execute_statements(self._execute_raise_insufficient_privileges):
with self.assertRaises(SystemExit):
creation._create_test_db(verbosity=0, keepdb=False)
with self.assertRaises(SystemExit):
creation._create_test_db(verbosity=0, keepdb=True)
@mock.patch.object(OracleDatabaseCreation, '_test_database_create', return_value=False)
def test_create_test_user(self, *mocked_objects):
creation = OracleDatabaseCreation(connection)
with mock.patch.object(OracleDatabaseCreation, '_test_database_passwd', self._test_database_passwd):
# Simulate test user creation raising "user already exists"
with self.patch_execute_statements(self._execute_raise_user_already_exists):
with mock.patch('builtins.input', return_value='no'):
with self.assertRaises(SystemExit):
# SystemExit is raised if the user answers "no" to the
# prompt asking if it's okay to delete the test user.
creation._create_test_db(verbosity=0, keepdb=False)
# "User already exists" error is ignored when keepdb is on
creation._create_test_db(verbosity=0, keepdb=True)
# Simulate test user creation raising unexpected error
with self.patch_execute_statements(self._execute_raise_insufficient_privileges):
with self.assertRaises(SystemExit):
creation._create_test_db(verbosity=0, keepdb=False)
with self.assertRaises(SystemExit):
creation._create_test_db(verbosity=0, keepdb=True)
@unittest.skipUnless(connection.vendor == 'mysql', "MySQL specific tests")
class MySQLDatabaseCreationTests(SimpleTestCase):
def _execute_raise_database_exists(self, cursor, parameters, keepdb=False):
raise DatabaseError(1007, "Can't create database '%s'; database exists" % parameters['dbname'])
def _execute_raise_access_denied(self, cursor, parameters, keepdb=False):
raise DatabaseError(1044, "Access denied for user")
def patch_test_db_creation(self, execute_create_test_db):
return mock.patch.object(BaseDatabaseCreation, '_execute_create_test_db', execute_create_test_db)
@mock.patch('sys.stdout', new_callable=StringIO)
@mock.patch('sys.stderr', new_callable=StringIO)
def test_create_test_db_database_exists(self, *mocked_objects):
# Simulate test database creation raising "database exists"
creation = MySQLDatabaseCreation(connection)
with self.patch_test_db_creation(self._execute_raise_database_exists):
with mock.patch('builtins.input', return_value='no'):
with self.assertRaises(SystemExit):
# SystemExit is raised if the user answers "no" to the
# prompt asking if it's okay to delete the test database.
creation._create_test_db(verbosity=0, autoclobber=False, keepdb=False)
# "Database exists" shouldn't appear when keepdb is on
creation._create_test_db(verbosity=0, autoclobber=False, keepdb=True)
@mock.patch('sys.stdout', new_callable=StringIO)
@mock.patch('sys.stderr', new_callable=StringIO)
def test_create_test_db_unexpected_error(self, *mocked_objects):
# Simulate test database creation raising unexpected error
creation = MySQLDatabaseCreation(connection)
with self.patch_test_db_creation(self._execute_raise_access_denied):
with self.assertRaises(SystemExit):
creation._create_test_db(verbosity=0, autoclobber=False, keepdb=False)

View File

@ -1,22 +1,11 @@
import unittest
"""Tests for django.db.backends.utils"""
from decimal import Decimal, Rounded
from django.core.exceptions import ImproperlyConfigured
from django.db import connection
from django.db.backends.utils import truncate_name
from django.db.utils import ProgrammingError, load_backend
from django.test import SimpleTestCase, TestCase
from django.db.backends.utils import format_number, truncate_name
from django.test import SimpleTestCase
class TestLoadBackend(SimpleTestCase):
def test_load_backend_invalid_name(self):
msg = (
"'foo' isn't an available database backend.\n"
"Try using 'django.db.backends.XXX', where XXX is one of:\n"
" 'mysql', 'oracle', 'postgresql', 'sqlite3'"
)
with self.assertRaisesMessage(ImproperlyConfigured, msg) as cm:
load_backend('foo')
self.assertEqual(str(cm.exception.__cause__), "No module named 'foo'")
class TestUtils(SimpleTestCase):
def test_truncate_name(self):
self.assertEqual(truncate_name('some_table', 10), 'some_table')
@ -28,15 +17,31 @@ class TestLoadBackend(SimpleTestCase):
self.assertEqual(truncate_name('username"."some_long_table', 10), 'username"."some_la38a')
self.assertEqual(truncate_name('username"."some_long_table', 10, 3), 'username"."some_loa38')
def test_format_number(self):
def equal(value, max_d, places, result):
self.assertEqual(format_number(Decimal(value), max_d, places), result)
@unittest.skipUnless(connection.vendor == 'postgresql', 'PostgreSQL specific tests')
class TestDatabaseErrorWrapper(TestCase):
def test_reraising_backend_specific_database_exception(self):
cursor = connection.cursor()
msg = 'table "X" does not exist'
with self.assertRaisesMessage(ProgrammingError, msg) as cm:
cursor.execute('DROP TABLE "X"')
self.assertNotEqual(type(cm.exception), type(cm.exception.__cause__))
self.assertIsNotNone(cm.exception.__cause__)
self.assertIsNotNone(cm.exception.__cause__.pgcode)
self.assertIsNotNone(cm.exception.__cause__.pgerror)
equal('0', 12, 3, '0.000')
equal('0', 12, 8, '0.00000000')
equal('1', 12, 9, '1.000000000')
equal('0.00000000', 12, 8, '0.00000000')
equal('0.000000004', 12, 8, '0.00000000')
equal('0.000000008', 12, 8, '0.00000001')
equal('0.000000000000000000999', 10, 8, '0.00000000')
equal('0.1234567890', 12, 10, '0.1234567890')
equal('0.1234567890', 12, 9, '0.123456789')
equal('0.1234567890', 12, 8, '0.12345679')
equal('0.1234567890', 12, 5, '0.12346')
equal('0.1234567890', 12, 3, '0.123')
equal('0.1234567890', 12, 1, '0.1')
equal('0.1234567890', 12, 0, '0')
equal('0.1234567890', None, 0, '0')
equal('1234567890.1234567890', None, 0, '1234567890')
equal('1234567890.1234567890', None, 2, '1234567890.12')
equal('0.1234', 5, None, '0.1234')
equal('123.12', 5, None, '123.12')
with self.assertRaises(Rounded):
equal('0.1234567890', 5, None, '0.12346')
with self.assertRaises(Rounded):
equal('1234567890.1234', 5, None, '1234600000')

View File

@ -1,13 +1,9 @@
# Unit and doctests for specific database backends.
"""Tests related to django.db.backends that haven't been organized."""
import datetime
import re
import threading
import unittest
import warnings
from decimal import Decimal, Rounded
from unittest import mock
from django.core.exceptions import ImproperlyConfigured
from django.core.management.color import no_style
from django.db import (
DEFAULT_DB_ALIAS, DatabaseError, IntegrityError, connection, connections,
@ -15,322 +11,20 @@ from django.db import (
)
from django.db.backends.base.base import BaseDatabaseWrapper
from django.db.backends.signals import connection_created
from django.db.backends.utils import CursorWrapper, format_number
from django.db.models import Avg, StdDev, Sum, Variance
from django.db.backends.utils import CursorWrapper
from django.db.models.sql.constants import CURSOR
from django.db.utils import ConnectionHandler
from django.test import (
SimpleTestCase, TestCase, TransactionTestCase, override_settings,
skipIfDBFeature, skipUnlessDBFeature,
TestCase, TransactionTestCase, override_settings, skipIfDBFeature,
skipUnlessDBFeature,
)
from .models import (
Article, Item, Object, ObjectReference, Person, Post, RawData, Reporter,
Article, Object, ObjectReference, Person, Post, RawData, Reporter,
ReporterProxy, SchoolClass, Square,
VeryLongModelNameZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZ,
)
class DatabaseWrapperTests(SimpleTestCase):
def test_initialization_class_attributes(self):
"""
The "initialization" class attributes like client_class and
creation_class should be set on the class and reflected in the
corresponding instance attributes of the instantiated backend.
"""
conn = connections[DEFAULT_DB_ALIAS]
conn_class = type(conn)
attr_names = [
('client_class', 'client'),
('creation_class', 'creation'),
('features_class', 'features'),
('introspection_class', 'introspection'),
('ops_class', 'ops'),
('validation_class', 'validation'),
]
for class_attr_name, instance_attr_name in attr_names:
class_attr_value = getattr(conn_class, class_attr_name)
self.assertIsNotNone(class_attr_value)
instance_attr_value = getattr(conn, instance_attr_name)
self.assertIsInstance(instance_attr_value, class_attr_value)
def test_initialization_display_name(self):
self.assertEqual(BaseDatabaseWrapper.display_name, 'unknown')
self.assertNotEqual(connection.display_name, 'unknown')
class DummyBackendTest(SimpleTestCase):
def test_no_databases(self):
"""
Empty DATABASES setting default to the dummy backend.
"""
DATABASES = {}
conns = ConnectionHandler(DATABASES)
self.assertEqual(conns[DEFAULT_DB_ALIAS].settings_dict['ENGINE'], 'django.db.backends.dummy')
with self.assertRaises(ImproperlyConfigured):
conns[DEFAULT_DB_ALIAS].ensure_connection()
@unittest.skipUnless(connection.vendor == 'oracle', "Test only for Oracle")
class OracleTests(unittest.TestCase):
def test_quote_name(self):
# '%' chars are escaped for query execution.
name = '"SOME%NAME"'
quoted_name = connection.ops.quote_name(name)
self.assertEqual(quoted_name % (), name)
def test_dbms_session(self):
# If the backend is Oracle, test that we can call a standard
# stored procedure through our cursor wrapper.
with connection.cursor() as cursor:
cursor.callproc('DBMS_SESSION.SET_IDENTIFIER', ['_django_testing!'])
def test_cursor_var(self):
# If the backend is Oracle, test that we can pass cursor variables
# as query parameters.
from django.db.backends.oracle.base import Database
with connection.cursor() as cursor:
var = cursor.var(Database.STRING)
cursor.execute("BEGIN %s := 'X'; END; ", [var])
self.assertEqual(var.getvalue(), 'X')
def test_long_string(self):
# If the backend is Oracle, test that we can save a text longer
# than 4000 chars and read it properly
with connection.cursor() as cursor:
cursor.execute('CREATE TABLE ltext ("TEXT" NCLOB)')
long_str = ''.join(str(x) for x in range(4000))
cursor.execute('INSERT INTO ltext VALUES (%s)', [long_str])
cursor.execute('SELECT text FROM ltext')
row = cursor.fetchone()
self.assertEqual(long_str, row[0].read())
cursor.execute('DROP TABLE ltext')
def test_client_encoding(self):
# If the backend is Oracle, test that the client encoding is set
# correctly. This was broken under Cygwin prior to r14781.
connection.ensure_connection()
self.assertEqual(connection.connection.encoding, "UTF-8")
self.assertEqual(connection.connection.nencoding, "UTF-8")
def test_order_of_nls_parameters(self):
# an 'almost right' datetime should work with configured
# NLS parameters as per #18465.
with connection.cursor() as cursor:
query = "select 1 from dual where '1936-12-29 00:00' < sysdate"
# The query succeeds without errors - pre #18465 this
# wasn't the case.
cursor.execute(query)
self.assertEqual(cursor.fetchone()[0], 1)
@unittest.skipUnless(connection.vendor == 'sqlite', "Test only for SQLite")
class SQLiteTests(TestCase):
longMessage = True
def test_autoincrement(self):
"""
auto_increment fields are created with the AUTOINCREMENT keyword
in order to be monotonically increasing. Refs #10164.
"""
with connection.schema_editor(collect_sql=True) as editor:
editor.create_model(Square)
statements = editor.collected_sql
match = re.search('"id" ([^,]+),', statements[0])
self.assertIsNotNone(match)
self.assertEqual(
'integer NOT NULL PRIMARY KEY AUTOINCREMENT',
match.group(1),
"Wrong SQL used to create an auto-increment column on SQLite"
)
def test_aggregation(self):
"""
#19360: Raise NotImplementedError when aggregating on date/time fields.
"""
for aggregate in (Sum, Avg, Variance, StdDev):
with self.assertRaises(NotImplementedError):
Item.objects.all().aggregate(aggregate('time'))
with self.assertRaises(NotImplementedError):
Item.objects.all().aggregate(aggregate('date'))
with self.assertRaises(NotImplementedError):
Item.objects.all().aggregate(aggregate('last_modified'))
with self.assertRaises(NotImplementedError):
Item.objects.all().aggregate(
**{'complex': aggregate('last_modified') + aggregate('last_modified')}
)
def test_memory_db_test_name(self):
"""
A named in-memory db should be allowed where supported.
"""
from django.db.backends.sqlite3.base import DatabaseWrapper
settings_dict = {
'TEST': {
'NAME': 'file:memorydb_test?mode=memory&cache=shared',
}
}
wrapper = DatabaseWrapper(settings_dict)
creation = wrapper.creation
if creation.connection.features.can_share_in_memory_db:
expected = creation.connection.settings_dict['TEST']['NAME']
self.assertEqual(creation._get_test_db_name(), expected)
else:
msg = (
"Using a shared memory database with `mode=memory` in the "
"database name is not supported in your environment, "
"use `:memory:` instead."
)
with self.assertRaisesMessage(ImproperlyConfigured, msg):
creation._get_test_db_name()
@unittest.skipUnless(connection.vendor == 'postgresql', "Test only for PostgreSQL")
class PostgreSQLTests(TestCase):
def test_nodb_connection(self):
"""
The _nodb_connection property fallbacks to the default connection
database when access to the 'postgres' database is not granted.
"""
def mocked_connect(self):
if self.settings_dict['NAME'] is None:
raise DatabaseError()
return ''
nodb_conn = connection._nodb_connection
self.assertIsNone(nodb_conn.settings_dict['NAME'])
# Now assume the 'postgres' db isn't available
with warnings.catch_warnings(record=True) as w:
with mock.patch('django.db.backends.base.base.BaseDatabaseWrapper.connect',
side_effect=mocked_connect, autospec=True):
warnings.simplefilter('always', RuntimeWarning)
nodb_conn = connection._nodb_connection
self.assertIsNotNone(nodb_conn.settings_dict['NAME'])
self.assertEqual(nodb_conn.settings_dict['NAME'], connection.settings_dict['NAME'])
# Check a RuntimeWarning has been emitted
self.assertEqual(len(w), 1)
self.assertEqual(w[0].message.__class__, RuntimeWarning)
def test_connect_and_rollback(self):
"""
PostgreSQL shouldn't roll back SET TIME ZONE, even if the first
transaction is rolled back (#17062).
"""
new_connection = connection.copy()
try:
# Ensure the database default time zone is different than
# the time zone in new_connection.settings_dict. We can
# get the default time zone by reset & show.
cursor = new_connection.cursor()
cursor.execute("RESET TIMEZONE")
cursor.execute("SHOW TIMEZONE")
db_default_tz = cursor.fetchone()[0]
new_tz = 'Europe/Paris' if db_default_tz == 'UTC' else 'UTC'
new_connection.close()
# Invalidate timezone name cache, because the setting_changed
# handler cannot know about new_connection.
del new_connection.timezone_name
# Fetch a new connection with the new_tz as default
# time zone, run a query and rollback.
with self.settings(TIME_ZONE=new_tz):
new_connection.set_autocommit(False)
cursor = new_connection.cursor()
new_connection.rollback()
# Now let's see if the rollback rolled back the SET TIME ZONE.
cursor.execute("SHOW TIMEZONE")
tz = cursor.fetchone()[0]
self.assertEqual(new_tz, tz)
finally:
new_connection.close()
def test_connect_non_autocommit(self):
"""
The connection wrapper shouldn't believe that autocommit is enabled
after setting the time zone when AUTOCOMMIT is False (#21452).
"""
new_connection = connection.copy()
new_connection.settings_dict['AUTOCOMMIT'] = False
try:
# Open a database connection.
new_connection.cursor()
self.assertFalse(new_connection.get_autocommit())
finally:
new_connection.close()
def test_connect_isolation_level(self):
"""
Regression test for #18130 and #24318.
"""
import psycopg2
from psycopg2.extensions import (
ISOLATION_LEVEL_READ_COMMITTED as read_committed,
ISOLATION_LEVEL_SERIALIZABLE as serializable,
)
# Since this is a django.test.TestCase, a transaction is in progress
# and the isolation level isn't reported as 0. This test assumes that
# PostgreSQL is configured with the default isolation level.
# Check the level on the psycopg2 connection, not the Django wrapper.
default_level = read_committed if psycopg2.__version__ < '2.7' else None
self.assertEqual(connection.connection.isolation_level, default_level)
new_connection = connection.copy()
new_connection.settings_dict['OPTIONS']['isolation_level'] = serializable
try:
# Start a transaction so the isolation level isn't reported as 0.
new_connection.set_autocommit(False)
# Check the level on the psycopg2 connection, not the Django wrapper.
self.assertEqual(new_connection.connection.isolation_level, serializable)
finally:
new_connection.close()
def _select(self, val):
with connection.cursor() as cursor:
cursor.execute("SELECT %s", (val,))
return cursor.fetchone()[0]
def test_select_ascii_array(self):
a = ["awef"]
b = self._select(a)
self.assertEqual(a[0], b[0])
def test_select_unicode_array(self):
a = ["ᄲawef"]
b = self._select(a)
self.assertEqual(a[0], b[0])
def test_lookup_cast(self):
from django.db.backends.postgresql.operations import DatabaseOperations
do = DatabaseOperations(connection=None)
for lookup in ('iexact', 'contains', 'icontains', 'startswith',
'istartswith', 'endswith', 'iendswith', 'regex', 'iregex'):
self.assertIn('::text', do.lookup_cast(lookup))
def test_correct_extraction_psycopg2_version(self):
from django.db.backends.postgresql.base import psycopg2_version
with mock.patch('psycopg2.__version__', '4.2.1 (dt dec pq3 ext lo64)'):
self.assertEqual(psycopg2_version(), (4, 2, 1))
with mock.patch('psycopg2.__version__', '4.2b0.dev1 (dt dec pq3 ext lo64)'):
self.assertEqual(psycopg2_version(), (4, 2))
class DateQuotingTest(TestCase):
def test_django_date_trunc(self):
@ -379,38 +73,6 @@ class LastExecutedQueryTest(TestCase):
last_sql = cursor.db.ops.last_executed_query(cursor, sql, params)
self.assertIsInstance(last_sql, str)
@unittest.skipUnless(connection.vendor == 'sqlite',
"This test is specific to SQLite.")
def test_no_interpolation_on_sqlite(self):
# This shouldn't raise an exception (##17158)
query = "SELECT strftime('%Y', 'now');"
connection.cursor().execute(query)
self.assertEqual(connection.queries[-1]['sql'], query)
@unittest.skipUnless(connection.vendor == 'sqlite',
"This test is specific to SQLite.")
def test_parameter_quoting_on_sqlite(self):
# The implementation of last_executed_queries isn't optimal. It's
# worth testing that parameters are quoted. See #14091.
query = "SELECT %s"
params = ["\"'\\"]
connection.cursor().execute(query, params)
# Note that the single quote is repeated
substituted = "SELECT '\"''\\'"
self.assertEqual(connection.queries[-1]['sql'], substituted)
@unittest.skipUnless(connection.vendor == 'sqlite',
"This test is specific to SQLite.")
def test_large_number_of_parameters_on_sqlite(self):
# If SQLITE_MAX_VARIABLE_NUMBER (default = 999) has been changed to be
# greater than SQLITE_MAX_COLUMN (default = 2000), last_executed_query
# can hit the SQLITE_MAX_COLUMN limit. See #26063.
cursor = connection.cursor()
sql = "SELECT MAX(%s)" % ", ".join(["%s"] * 2001)
params = list(range(2001))
# This should not raise an exception.
cursor.db.ops.last_executed_query(cursor.cursor, sql, params)
class ParameterHandlingTest(TestCase):
@ -539,16 +201,6 @@ class EscapingChecks(TestCase):
cursor.execute("SELECT '%%', %s" + self.bare_select_suffix, ('%d',))
self.assertEqual(cursor.fetchall()[0], ('%', '%d'))
@unittest.skipUnless(connection.vendor == 'sqlite',
"This is an sqlite-specific issue")
def test_sqlite_parameter_escaping(self):
# '%s' escaping support for sqlite3 #13648
cursor = connection.cursor()
cursor.execute("select strftime('%s', date('now'))")
response = cursor.fetchall()[0][0]
# response should be an non-zero integer
self.assertTrue(int(response))
@override_settings(DEBUG=True)
class EscapingChecksDebug(EscapingChecks):
@ -1128,76 +780,3 @@ class DBConstraintTestCase(TestCase):
intermediary_model.objects.create(from_object_id=obj.id, to_object_id=12345)
self.assertEqual(obj.related_objects.count(), 1)
self.assertEqual(intermediary_model.objects.count(), 2)
class BackendUtilTests(SimpleTestCase):
def test_format_number(self):
"""
Test the format_number converter utility
"""
def equal(value, max_d, places, result):
self.assertEqual(format_number(Decimal(value), max_d, places), result)
equal('0', 12, 3,
'0.000')
equal('0', 12, 8,
'0.00000000')
equal('1', 12, 9,
'1.000000000')
equal('0.00000000', 12, 8,
'0.00000000')
equal('0.000000004', 12, 8,
'0.00000000')
equal('0.000000008', 12, 8,
'0.00000001')
equal('0.000000000000000000999', 10, 8,
'0.00000000')
equal('0.1234567890', 12, 10,
'0.1234567890')
equal('0.1234567890', 12, 9,
'0.123456789')
equal('0.1234567890', 12, 8,
'0.12345679')
equal('0.1234567890', 12, 5,
'0.12346')
equal('0.1234567890', 12, 3,
'0.123')
equal('0.1234567890', 12, 1,
'0.1')
equal('0.1234567890', 12, 0,
'0')
equal('0.1234567890', None, 0,
'0')
equal('1234567890.1234567890', None, 0,
'1234567890')
equal('1234567890.1234567890', None, 2,
'1234567890.12')
equal('0.1234', 5, None,
'0.1234')
equal('123.12', 5, None,
'123.12')
with self.assertRaises(Rounded):
equal('0.1234567890', 5, None,
'0.12346')
with self.assertRaises(Rounded):
equal('1234567890.1234', 5, None,
'1234600000')
@unittest.skipUnless(connection.vendor == 'sqlite', 'SQLite specific test.')
@skipUnlessDBFeature('can_share_in_memory_db')
class TestSqliteThreadSharing(TransactionTestCase):
available_apps = ['backends']
def test_database_sharing_in_threads(self):
def create_object():
Object.objects.create()
create_object()
thread = threading.Thread(target=create_object)
thread.start()
thread.join()
self.assertEqual(Object.objects.count(), 2)

49
tests/db_utils/tests.py Normal file
View File

@ -0,0 +1,49 @@
"""Tests for django.db.utils."""
import unittest
from django.core.exceptions import ImproperlyConfigured
from django.db import DEFAULT_DB_ALIAS, connection
from django.db.utils import ConnectionHandler, ProgrammingError, load_backend
from django.test import SimpleTestCase, TestCase
class ConnectionHandlerTests(SimpleTestCase):
def test_connection_handler_no_databases(self):
"""Empty DATABASES setting defaults to the dummy backend."""
DATABASES = {}
conns = ConnectionHandler(DATABASES)
self.assertEqual(conns[DEFAULT_DB_ALIAS].settings_dict['ENGINE'], 'django.db.backends.dummy')
msg = (
'settings.DATABASES is improperly configured. Please supply the '
'ENGINE value. Check settings documentation for more details.'
)
with self.assertRaisesMessage(ImproperlyConfigured, msg):
conns[DEFAULT_DB_ALIAS].ensure_connection()
class DatabaseErrorWrapperTests(TestCase):
@unittest.skipUnless(connection.vendor == 'postgresql', 'PostgreSQL test')
def test_reraising_backend_specific_database_exception(self):
cursor = connection.cursor()
msg = 'table "X" does not exist'
with self.assertRaisesMessage(ProgrammingError, msg) as cm:
cursor.execute('DROP TABLE "X"')
self.assertNotEqual(type(cm.exception), type(cm.exception.__cause__))
self.assertIsNotNone(cm.exception.__cause__)
self.assertIsNotNone(cm.exception.__cause__.pgcode)
self.assertIsNotNone(cm.exception.__cause__.pgerror)
class LoadBackendTests(SimpleTestCase):
def test_load_backend_invalid_name(self):
msg = (
"'foo' isn't an available database backend.\n"
"Try using 'django.db.backends.XXX', where XXX is one of:\n"
" 'mysql', 'oracle', 'postgresql', 'sqlite3'"
)
with self.assertRaisesMessage(ImproperlyConfigured, msg) as cm:
load_backend('foo')
self.assertEqual(str(cm.exception.__cause__), "No module named 'foo'")