Fixed #31777 -- Added support for database collations to Char/TextFields.

Thanks Simon Charette and Mariusz Felisiak for reviews.
This commit is contained in:
Tom Carrick 2020-07-18 13:17:39 +02:00 committed by Mariusz Felisiak
parent ba6b32e5ef
commit e387f191f7
25 changed files with 544 additions and 30 deletions

View File

@ -247,6 +247,9 @@ class Command(BaseCommand):
if field_type == 'CharField' and row.internal_size:
field_params['max_length'] = int(row.internal_size)
if field_type in {'CharField', 'TextField'} and row.collation:
field_params['db_collation'] = row.collation
if field_type == 'DecimalField':
if row.precision is None or row.scale is None:
field_notes.append(

View File

@ -302,10 +302,17 @@ class BaseDatabaseFeatures:
# {'d': [{'f': 'g'}]}?
json_key_contains_list_matching_requires_list = False
# Does the backend support column collations?
supports_collation_on_charfield = True
supports_collation_on_textfield = True
# Does the backend support non-deterministic collations?
supports_non_deterministic_collations = True
# Collation names for use by the Django test suite.
test_collations = {
'ci': None, # Case-insensitive.
'cs': None, # Case-sensitive.
'non_default': None, # Non-default.
'swedish_ci': None # Swedish case-insensitive.
}

View File

@ -4,7 +4,11 @@ from collections import namedtuple
TableInfo = namedtuple('TableInfo', ['name', 'type'])
# Structure returned by the DB-API cursor.description interface (PEP 249)
FieldInfo = namedtuple('FieldInfo', 'name type_code display_size internal_size precision scale null_ok default')
FieldInfo = namedtuple(
'FieldInfo',
'name type_code display_size internal_size precision scale null_ok '
'default collation'
)
class BaseDatabaseIntrospection:

View File

@ -61,6 +61,7 @@ class BaseDatabaseSchemaEditor:
sql_alter_column_not_null = "ALTER COLUMN %(column)s SET NOT NULL"
sql_alter_column_default = "ALTER COLUMN %(column)s SET DEFAULT %(default)s"
sql_alter_column_no_default = "ALTER COLUMN %(column)s DROP DEFAULT"
sql_alter_column_collate = "ALTER COLUMN %(column)s TYPE %(type)s%(collation)s"
sql_delete_column = "ALTER TABLE %(table)s DROP COLUMN %(column)s CASCADE"
sql_rename_column = "ALTER TABLE %(table)s RENAME COLUMN %(old_column)s TO %(new_column)s"
sql_update_with_default = "UPDATE %(table)s SET %(column)s = %(default)s WHERE %(column)s IS NULL"
@ -215,6 +216,10 @@ class BaseDatabaseSchemaEditor:
# Check for fields that aren't actually columns (e.g. M2M)
if sql is None:
return None, None
# Collation.
collation = getattr(field, 'db_collation', None)
if collation:
sql += self._collate_sql(collation)
# Work out nullability
null = field.null
# If we were told to include a default value, do so
@ -676,8 +681,15 @@ class BaseDatabaseSchemaEditor:
actions = []
null_actions = []
post_actions = []
# Collation change?
old_collation = getattr(old_field, 'db_collation', None)
new_collation = getattr(new_field, 'db_collation', None)
if old_collation != new_collation:
# Collation change handles also a type change.
fragment = self._alter_column_collation_sql(model, new_field, new_type, new_collation)
actions.append(fragment)
# Type change?
if old_type != new_type:
elif old_type != new_type:
fragment, other_actions = self._alter_column_type_sql(model, old_field, new_field, new_type)
actions.append(fragment)
post_actions.extend(other_actions)
@ -895,6 +907,16 @@ class BaseDatabaseSchemaEditor:
[],
)
def _alter_column_collation_sql(self, model, new_field, new_type, new_collation):
return (
self.sql_alter_column_collate % {
'column': self.quote_name(new_field.column),
'type': new_type,
'collation': self._collate_sql(new_collation) if new_collation else '',
},
[],
)
def _alter_many_to_many(self, model, old_field, new_field, strict):
"""Alter M2Ms to repoint their to= endpoints."""
# Rename the through table
@ -1274,6 +1296,9 @@ class BaseDatabaseSchemaEditor:
def _delete_primary_key_sql(self, model, name):
return self._delete_constraint_sql(self.sql_delete_pk, model, name)
def _collate_sql(self, collation):
return ' COLLATE ' + self.quote_name(collation)
def remove_procedure(self, procedure_name, param_types=()):
sql = self.sql_delete_procedure % {
'procedure': self.quote_name(procedure_name),

View File

@ -46,6 +46,7 @@ class DatabaseFeatures(BaseDatabaseFeatures):
order_by_nulls_first = True
test_collations = {
'ci': 'utf8_general_ci',
'non_default': 'utf8_esperanto_ci',
'swedish_ci': 'utf8_swedish_ci',
}

View File

@ -10,7 +10,11 @@ from django.db.models import Index
from django.utils.datastructures import OrderedSet
FieldInfo = namedtuple('FieldInfo', BaseFieldInfo._fields + ('extra', 'is_unsigned', 'has_json_constraint'))
InfoLine = namedtuple('InfoLine', 'col_name data_type max_len num_prec num_scale extra column_default is_unsigned')
InfoLine = namedtuple(
'InfoLine',
'col_name data_type max_len num_prec num_scale extra column_default '
'collation is_unsigned'
)
class DatabaseIntrospection(BaseDatabaseIntrospection):
@ -84,6 +88,15 @@ class DatabaseIntrospection(BaseDatabaseIntrospection):
c.constraint_schema = DATABASE()
""", [table_name])
json_constraints = {row[0] for row in cursor.fetchall()}
# A default collation for the given table.
cursor.execute("""
SELECT table_collation
FROM information_schema.tables
WHERE table_schema = DATABASE()
AND table_name = %s
""", [table_name])
row = cursor.fetchone()
default_column_collation = row[0] if row else ''
# information_schema database gives more accurate results for some figures:
# - varchar length returned by cursor.description is an internal length,
# not visible length (#5725)
@ -93,12 +106,17 @@ class DatabaseIntrospection(BaseDatabaseIntrospection):
SELECT
column_name, data_type, character_maximum_length,
numeric_precision, numeric_scale, extra, column_default,
CASE
WHEN collation_name = %s THEN NULL
ELSE collation_name
END AS collation_name,
CASE
WHEN column_type LIKE '%% unsigned' THEN 1
ELSE 0
END AS is_unsigned
FROM information_schema.columns
WHERE table_name = %s AND table_schema = DATABASE()""", [table_name])
WHERE table_name = %s AND table_schema = DATABASE()
""", [default_column_collation, table_name])
field_info = {line[0]: InfoLine(*line) for line in cursor.fetchall()}
cursor.execute("SELECT * FROM %s LIMIT 1" % self.connection.ops.quote_name(table_name))
@ -116,6 +134,7 @@ class DatabaseIntrospection(BaseDatabaseIntrospection):
to_int(info.num_scale) or line[5],
line[6],
info.column_default,
info.collation,
info.extra,
info.is_unsigned,
line[0] in json_constraints,

View File

@ -9,6 +9,7 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
sql_alter_column_null = "MODIFY %(column)s %(type)s NULL"
sql_alter_column_not_null = "MODIFY %(column)s %(type)s NOT NULL"
sql_alter_column_type = "MODIFY %(column)s %(type)s"
sql_alter_column_collate = "MODIFY %(column)s %(type)s%(collation)s"
# No 'CASCADE' which works as a no-op in MySQL but is undocumented
sql_delete_column = "ALTER TABLE %(table)s DROP COLUMN %(column)s"

View File

@ -1,4 +1,4 @@
from django.db import InterfaceError
from django.db import DatabaseError, InterfaceError
from django.db.backends.base.features import BaseDatabaseFeatures
from django.utils.functional import cached_property
@ -61,9 +61,11 @@ class DatabaseFeatures(BaseDatabaseFeatures):
supports_boolean_expr_in_select_clause = False
supports_primitives_in_json_field = False
supports_json_field_contains = False
supports_collation_on_textfield = False
test_collations = {
'ci': 'BINARY_CI',
'cs': 'BINARY',
'non_default': 'SWEDISH_CI',
'swedish_ci': 'SWEDISH_CI',
}
@ -78,3 +80,14 @@ class DatabaseFeatures(BaseDatabaseFeatures):
'SmallIntegerField': 'IntegerField',
'TimeField': 'DateTimeField',
}
@cached_property
def supports_collation_on_charfield(self):
with self.connection.cursor() as cursor:
try:
cursor.execute("SELECT CAST('a' AS VARCHAR2(4001)) FROM dual")
except DatabaseError as e:
if e.args[0].code == 910:
return False
raise
return True

View File

@ -95,14 +95,20 @@ class DatabaseIntrospection(BaseDatabaseIntrospection):
# user_tab_columns gives data default for columns
cursor.execute("""
SELECT
column_name,
data_default,
user_tab_cols.column_name,
user_tab_cols.data_default,
CASE
WHEN char_used IS NULL THEN data_length
ELSE char_length
WHEN user_tab_cols.collation = user_tables.default_collation
THEN NULL
ELSE user_tab_cols.collation
END collation,
CASE
WHEN user_tab_cols.char_used IS NULL
THEN user_tab_cols.data_length
ELSE user_tab_cols.char_length
END as internal_size,
CASE
WHEN identity_column = 'YES' THEN 1
WHEN user_tab_cols.identity_column = 'YES' THEN 1
ELSE 0
END as is_autofield,
CASE
@ -117,10 +123,13 @@ class DatabaseIntrospection(BaseDatabaseIntrospection):
ELSE 0
END as is_json
FROM user_tab_cols
WHERE table_name = UPPER(%s)""", [table_name])
LEFT OUTER JOIN
user_tables ON user_tables.table_name = user_tab_cols.table_name
WHERE user_tab_cols.table_name = UPPER(%s)
""", [table_name])
field_map = {
column: (internal_size, default if default != 'NULL' else None, is_autofield, is_json)
for column, default, internal_size, is_autofield, is_json in cursor.fetchall()
column: (internal_size, default if default != 'NULL' else None, collation, is_autofield, is_json)
for column, default, collation, internal_size, is_autofield, is_json in cursor.fetchall()
}
self.cache_bust_counter += 1
cursor.execute("SELECT * FROM {} WHERE ROWNUM < 2 AND {} > 0".format(
@ -129,11 +138,11 @@ class DatabaseIntrospection(BaseDatabaseIntrospection):
description = []
for desc in cursor.description:
name = desc[0]
internal_size, default, is_autofield, is_json = field_map[name]
internal_size, default, collation, is_autofield, is_json = field_map[name]
name = name % {} # cx_Oracle, for some reason, doubles percent signs.
description.append(FieldInfo(
self.identifier_converter(name), *desc[1:3], internal_size, desc[4] or 0,
desc[5] or 0, *desc[6:], default, is_autofield, is_json,
desc[5] or 0, *desc[6:], default, collation, is_autofield, is_json,
))
return description

View File

@ -14,6 +14,8 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
sql_alter_column_not_null = "MODIFY %(column)s NOT NULL"
sql_alter_column_default = "MODIFY %(column)s DEFAULT %(default)s"
sql_alter_column_no_default = "MODIFY %(column)s DEFAULT NULL"
sql_alter_column_collate = "MODIFY %(column)s %(type)s%(collation)s"
sql_delete_column = "ALTER TABLE %(table)s DROP COLUMN %(column)s"
sql_create_column_inline_fk = 'CONSTRAINT %(name)s REFERENCES %(to_table)s(%(to_column)s)%(deferrable)s'
sql_delete_table = "DROP TABLE %(table)s CASCADE CONSTRAINTS"
@ -181,3 +183,15 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
'table': self.quote_name(table_name),
'column': self.quote_name(column_name),
})
def _get_default_collation(self, table_name):
with self.connection.cursor() as cursor:
cursor.execute("""
SELECT default_collation FROM user_tables WHERE table_name = %s
""", [self.normalize_name(table_name)])
return cursor.fetchone()[0]
def _alter_column_collation_sql(self, model, new_field, new_type, new_collation):
if new_collation is None:
new_collation = self._get_default_collation(model._meta.db_table)
return super()._alter_column_collation_sql(model, new_field, new_type, new_collation)

View File

@ -59,6 +59,7 @@ class DatabaseFeatures(BaseDatabaseFeatures):
has_json_operators = True
json_key_contains_list_matching_requires_list = True
test_collations = {
'non_default': 'sv-x-icu',
'swedish_ci': 'sv-x-icu',
}
@ -92,3 +93,4 @@ class DatabaseFeatures(BaseDatabaseFeatures):
supports_table_partitions = property(operator.attrgetter('is_postgresql_10'))
supports_covering_indexes = property(operator.attrgetter('is_postgresql_11'))
supports_covering_gist_indexes = property(operator.attrgetter('is_postgresql_12'))
supports_non_deterministic_collations = property(operator.attrgetter('is_postgresql_12'))

View File

@ -69,9 +69,11 @@ class DatabaseIntrospection(BaseDatabaseIntrospection):
SELECT
a.attname AS column_name,
NOT (a.attnotnull OR (t.typtype = 'd' AND t.typnotnull)) AS is_nullable,
pg_get_expr(ad.adbin, ad.adrelid) AS column_default
pg_get_expr(ad.adbin, ad.adrelid) AS column_default,
CASE WHEN collname = 'default' THEN NULL ELSE collname END AS collation
FROM pg_attribute a
LEFT JOIN pg_attrdef ad ON a.attrelid = ad.adrelid AND a.attnum = ad.adnum
LEFT JOIN pg_collation co ON a.attcollation = co.oid
JOIN pg_type t ON a.atttypid = t.oid
JOIN pg_class c ON a.attrelid = c.oid
JOIN pg_namespace n ON c.relnamespace = n.oid

View File

@ -47,6 +47,7 @@ class DatabaseFeatures(BaseDatabaseFeatures):
test_collations = {
'ci': 'nocase',
'cs': 'binary',
'non_default': 'nocase',
}
@cached_property

View File

@ -84,6 +84,7 @@ class DatabaseIntrospection(BaseDatabaseIntrospection):
"""
cursor.execute('PRAGMA table_info(%s)' % self.connection.ops.quote_name(table_name))
table_info = cursor.fetchall()
collations = self._get_column_collations(cursor, table_name)
json_columns = set()
if self.connection.features.can_introspect_json_field:
for line in table_info:
@ -102,7 +103,7 @@ class DatabaseIntrospection(BaseDatabaseIntrospection):
return [
FieldInfo(
name, data_type, None, get_field_size(data_type), None, None,
not notnull, default, pk == 1, name in json_columns
not notnull, default, collations.get(name), pk == 1, name in json_columns
)
for cid, name, data_type, notnull, default, pk in table_info
]
@ -435,3 +436,27 @@ class DatabaseIntrospection(BaseDatabaseIntrospection):
}
constraints.update(self._get_foreign_key_constraints(cursor, table_name))
return constraints
def _get_column_collations(self, cursor, table_name):
row = cursor.execute("""
SELECT sql
FROM sqlite_master
WHERE type = 'table' AND name = %s
""", [table_name]).fetchone()
if not row:
return {}
sql = row[0]
columns = str(sqlparse.parse(sql)[0][-1]).strip('()').split(', ')
collations = {}
for column in columns:
tokens = column[1:].split()
column_name = tokens[0].strip('"')
for index, token in enumerate(tokens):
if token == 'COLLATE':
collation = tokens[index + 1]
break
else:
collation = None
collations[column_name] = collation
return collations

View File

@ -429,3 +429,6 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
super().remove_constraint(model, constraint)
else:
self._remake_table(model)
def _collate_sql(self, collation):
return ' COLLATE ' + collation

View File

@ -1002,13 +1002,16 @@ class BooleanField(Field):
class CharField(Field):
description = _("String (up to %(max_length)s)")
def __init__(self, *args, **kwargs):
def __init__(self, *args, db_collation=None, **kwargs):
super().__init__(*args, **kwargs)
self.db_collation = db_collation
self.validators.append(validators.MaxLengthValidator(self.max_length))
def check(self, **kwargs):
databases = kwargs.get('databases') or []
return [
*super().check(**kwargs),
*self._check_db_collation(databases),
*self._check_max_length_attribute(**kwargs),
]
@ -1033,6 +1036,27 @@ class CharField(Field):
else:
return []
def _check_db_collation(self, databases):
errors = []
for db in databases:
if not router.allow_migrate_model(db, self.model):
continue
connection = connections[db]
if not (
self.db_collation is None or
'supports_collation_on_charfield' in self.model._meta.required_db_features or
connection.features.supports_collation_on_charfield
):
errors.append(
checks.Error(
'%s does not support a database collation on '
'CharFields.' % connection.display_name,
obj=self,
id='fields.E190',
),
)
return errors
def cast_db_type(self, connection):
if self.max_length is None:
return connection.ops.cast_char_field_without_max_length
@ -1061,6 +1085,12 @@ class CharField(Field):
defaults.update(kwargs)
return super().formfield(**defaults)
def deconstruct(self):
name, path, args, kwargs = super().deconstruct()
if self.db_collation:
kwargs['db_collation'] = self.db_collation
return name, path, args, kwargs
class CommaSeparatedIntegerField(CharField):
default_validators = [validators.validate_comma_separated_integer_list]
@ -2074,6 +2104,38 @@ class SmallIntegerField(IntegerField):
class TextField(Field):
description = _("Text")
def __init__(self, *args, db_collation=None, **kwargs):
super().__init__(*args, **kwargs)
self.db_collation = db_collation
def check(self, **kwargs):
databases = kwargs.get('databases') or []
return [
*super().check(**kwargs),
*self._check_db_collation(databases),
]
def _check_db_collation(self, databases):
errors = []
for db in databases:
if not router.allow_migrate_model(db, self.model):
continue
connection = connections[db]
if not (
self.db_collation is None or
'supports_collation_on_textfield' in self.model._meta.required_db_features or
connection.features.supports_collation_on_textfield
):
errors.append(
checks.Error(
'%s does not support a database collation on '
'TextFields.' % connection.display_name,
obj=self,
id='fields.E190',
),
)
return errors
def get_internal_type(self):
return "TextField"
@ -2096,6 +2158,12 @@ class TextField(Field):
**kwargs,
})
def deconstruct(self):
name, path, args, kwargs = super().deconstruct()
if self.db_collation:
kwargs['db_collation'] = self.db_collation
return name, path, args, kwargs
class TimeField(DateTimeCheckMixin, Field):
empty_strings_allowed = False

View File

@ -196,6 +196,8 @@ Model fields
* **fields.E170**: ``BinaryField``s ``default`` cannot be a string. Use bytes
content instead.
* **fields.E180**: ``<database>`` does not support ``JSONField``\s.
* **fields.E190**: ``<database>`` does not support a database collation on
``<field_type>``\s.
* **fields.E900**: ``IPAddressField`` has been removed except for support in
historical migrations.
* **fields.W900**: ``IPAddressField`` has been deprecated. Support for it

View File

@ -593,21 +593,37 @@ For large amounts of text, use :class:`~django.db.models.TextField`.
The default form widget for this field is a :class:`~django.forms.TextInput`.
:class:`CharField` has one extra required argument:
:class:`CharField` has two extra arguments:
.. attribute:: CharField.max_length
The maximum length (in characters) of the field. The max_length is enforced
at the database level and in Django's validation using
Required. The maximum length (in characters) of the field. The max_length
is enforced at the database level and in Django's validation using
:class:`~django.core.validators.MaxLengthValidator`.
.. note::
.. note::
If you are writing an application that must be portable to multiple
database backends, you should be aware that there are restrictions on
``max_length`` for some backends. Refer to the :doc:`database backend
notes </ref/databases>` for details.
.. attribute:: CharField.db_collation
.. versionadded:: 3.2
Optional. The database collation name of the field.
.. note::
Collation names are not standardized. As such, this will not be
portable across multiple database backends.
.. admonition:: Oracle
Oracle supports collations only when the ``MAX_STRING_SIZE`` database
initialization parameter is set to ``EXTENDED``.
``DateField``
-------------
@ -1329,6 +1345,21 @@ If you specify a ``max_length`` attribute, it will be reflected in the
However it is not enforced at the model or database level. Use a
:class:`CharField` for that.
.. attribute:: TextField.db_collation
.. versionadded:: 3.2
The database collation name of the field.
.. note::
Collation names are not standardized. As such, this will not be
portable across multiple database backends.
.. admonition:: Oracle
Oracle does not support collations for a ``TextField``.
``TimeField``
-------------

View File

@ -308,6 +308,11 @@ Models
:class:`~django.db.models.functions.TruncTime` database functions allows
truncating datetimes in a specific timezone.
* The new ``db_collation`` argument for
:attr:`CharField <django.db.models.CharField.db_collation>` and
:attr:`TextField <django.db.models.TextField.db_collation>` allows setting a
database collation for the field.
Pagination
~~~~~~~~~~
@ -431,6 +436,13 @@ backends.
unique constraints (:attr:`.UniqueConstraint.include`), set
``DatabaseFeatures.supports_covering_indexes`` to ``True``.
* Third-party database backends must implement support for column database
collations on ``CharField``\s and ``TextField``\s or set
``DatabaseFeatures.supports_collation_on_charfield`` and
``DatabaseFeatures.supports_collation_on_textfield`` to ``False``. If
non-deterministic collations are not supported, set
``supports_non_deterministic_collations`` to ``False``.
:mod:`django.contrib.admin`
---------------------------

View File

@ -1,4 +1,4 @@
from django.db import models
from django.db import connection, models
class People(models.Model):
@ -79,6 +79,23 @@ class JSONFieldColumnType(models.Model):
}
test_collation = connection.features.test_collations.get('non_default')
class CharFieldDbCollation(models.Model):
char_field = models.CharField(max_length=10, db_collation=test_collation)
class Meta:
required_db_features = {'supports_collation_on_charfield'}
class TextFieldDbCollation(models.Model):
text_field = models.TextField(db_collation=test_collation)
class Meta:
required_db_features = {'supports_collation_on_textfield'}
class UniqueTogether(models.Model):
field1 = models.IntegerField()
field2 = models.CharField(max_length=10)

View File

@ -8,7 +8,7 @@ from django.db import connection
from django.db.backends.base.introspection import TableInfo
from django.test import TestCase, TransactionTestCase, skipUnlessDBFeature
from .models import PeopleMoreData
from .models import PeopleMoreData, test_collation
def inspectdb_tables_only(table_name):
@ -104,6 +104,41 @@ class InspectDBTestCase(TestCase):
self.assertIn('json_field = models.JSONField()', output)
self.assertIn('null_json_field = models.JSONField(blank=True, null=True)', output)
@skipUnlessDBFeature('supports_collation_on_charfield')
def test_char_field_db_collation(self):
out = StringIO()
call_command('inspectdb', 'inspectdb_charfielddbcollation', stdout=out)
output = out.getvalue()
if not connection.features.interprets_empty_strings_as_nulls:
self.assertIn(
"char_field = models.CharField(max_length=10, "
"db_collation='%s')" % test_collation,
output,
)
else:
self.assertIn(
"char_field = models.CharField(max_length=10, "
"db_collation='%s', blank=True, null=True)" % test_collation,
output,
)
@skipUnlessDBFeature('supports_collation_on_textfield')
def test_text_field_db_collation(self):
out = StringIO()
call_command('inspectdb', 'inspectdb_textfielddbcollation', stdout=out)
output = out.getvalue()
if not connection.features.interprets_empty_strings_as_nulls:
self.assertIn(
"text_field = models.TextField(db_collation='%s')" % test_collation,
output,
)
else:
self.assertIn(
"text_field = models.TextField(db_collation='%s, blank=True, "
"null=True)" % test_collation,
output,
)
def test_number_field_types(self):
"""Test introspection of various Django field types"""
assertFieldType = self.make_field_type_asserter()

View File

@ -86,7 +86,7 @@ class BinaryFieldTests(SimpleTestCase):
@isolate_apps('invalid_models_tests')
class CharFieldTests(SimpleTestCase):
class CharFieldTests(TestCase):
def test_valid_field(self):
class Model(models.Model):
@ -387,6 +387,30 @@ class CharFieldTests(SimpleTestCase):
)
])
def test_db_collation(self):
class Model(models.Model):
field = models.CharField(max_length=100, db_collation='anything')
field = Model._meta.get_field('field')
error = Error(
'%s does not support a database collation on CharFields.'
% connection.display_name,
id='fields.E190',
obj=field,
)
expected = [] if connection.features.supports_collation_on_charfield else [error]
self.assertEqual(field.check(databases=self.databases), expected)
def test_db_collation_required_db_features(self):
class Model(models.Model):
field = models.CharField(max_length=100, db_collation='anything')
class Meta:
required_db_features = {'supports_collation_on_charfield'}
field = Model._meta.get_field('field')
self.assertEqual(field.check(databases=self.databases), [])
@isolate_apps('invalid_models_tests')
class DateFieldTests(SimpleTestCase):
@ -779,6 +803,30 @@ class TextFieldTests(TestCase):
)
])
def test_db_collation(self):
class Model(models.Model):
field = models.TextField(db_collation='anything')
field = Model._meta.get_field('field')
error = Error(
'%s does not support a database collation on TextFields.'
% connection.display_name,
id='fields.E190',
obj=field,
)
expected = [] if connection.features.supports_collation_on_textfield else [error]
self.assertEqual(field.check(databases=self.databases), expected)
def test_db_collation_required_db_features(self):
class Model(models.Model):
field = models.TextField(db_collation='anything')
class Meta:
required_db_features = {'supports_collation_on_textfield'}
field = Model._meta.get_field('field')
self.assertEqual(field.check(databases=self.databases), [])
@isolate_apps('invalid_models_tests')
class UUIDFieldTests(TestCase):

View File

@ -44,6 +44,16 @@ class TestCharField(TestCase):
self.assertEqual(p2.title, Event.C)
class TestMethods(SimpleTestCase):
def test_deconstruct(self):
field = models.CharField()
*_, kwargs = field.deconstruct()
self.assertEqual(kwargs, {})
field = models.CharField(db_collation='utf8_esperanto_ci')
*_, kwargs = field.deconstruct()
self.assertEqual(kwargs, {'db_collation': 'utf8_esperanto_ci'})
class ValidationTests(SimpleTestCase):
class Choices(models.TextChoices):

View File

@ -2,7 +2,7 @@ from unittest import skipIf
from django import forms
from django.db import connection, models
from django.test import TestCase
from django.test import SimpleTestCase, TestCase
from .models import Post
@ -37,3 +37,13 @@ class TextFieldTests(TestCase):
p = Post.objects.create(title='Whatever', body='Smile 😀.')
p.refresh_from_db()
self.assertEqual(p.body, 'Smile 😀.')
class TestMethods(SimpleTestCase):
def test_deconstruct(self):
field = models.TextField()
*_, kwargs = field.deconstruct()
self.assertEqual(kwargs, {})
field = models.TextField(db_collation='utf8_esperanto_ci')
*_, kwargs = field.deconstruct()
self.assertEqual(kwargs, {'db_collation': 'utf8_esperanto_ci'})

View File

@ -185,6 +185,14 @@ class SchemaTests(TransactionTestCase):
counts['indexes'] += 1
return counts
def get_column_collation(self, table, column):
with connection.cursor() as cursor:
return next(
f.collation
for f in connection.introspection.get_table_description(cursor, table)
if f.name == column
)
def assertIndexOrder(self, table, index, order):
constraints = self.get_constraints(table)
self.assertIn(index, constraints)
@ -3224,3 +3232,147 @@ class SchemaTests(TransactionTestCase):
with connection.schema_editor(atomic=True) as editor:
editor.alter_db_table(Foo, Foo._meta.db_table, 'renamed_table')
Foo._meta.db_table = 'renamed_table'
@isolate_apps('schema')
@skipUnlessDBFeature('supports_collation_on_charfield')
def test_db_collation_charfield(self):
collation = connection.features.test_collations['non_default']
class Foo(Model):
field = CharField(max_length=255, db_collation=collation)
class Meta:
app_label = 'schema'
self.isolated_local_models = [Foo]
with connection.schema_editor() as editor:
editor.create_model(Foo)
self.assertEqual(
self.get_column_collation(Foo._meta.db_table, 'field'),
collation,
)
@isolate_apps('schema')
@skipUnlessDBFeature('supports_collation_on_textfield')
def test_db_collation_textfield(self):
collation = connection.features.test_collations['non_default']
class Foo(Model):
field = TextField(db_collation=collation)
class Meta:
app_label = 'schema'
self.isolated_local_models = [Foo]
with connection.schema_editor() as editor:
editor.create_model(Foo)
self.assertEqual(
self.get_column_collation(Foo._meta.db_table, 'field'),
collation,
)
@skipUnlessDBFeature('supports_collation_on_charfield')
def test_add_field_db_collation(self):
with connection.schema_editor() as editor:
editor.create_model(Author)
collation = connection.features.test_collations['non_default']
new_field = CharField(max_length=255, db_collation=collation)
new_field.set_attributes_from_name('alias')
with connection.schema_editor() as editor:
editor.add_field(Author, new_field)
columns = self.column_classes(Author)
self.assertEqual(
columns['alias'][0],
connection.features.introspected_field_types['CharField'],
)
self.assertEqual(columns['alias'][1][8], collation)
@skipUnlessDBFeature('supports_collation_on_charfield')
def test_alter_field_db_collation(self):
with connection.schema_editor() as editor:
editor.create_model(Author)
collation = connection.features.test_collations['non_default']
old_field = Author._meta.get_field('name')
new_field = CharField(max_length=255, db_collation=collation)
new_field.set_attributes_from_name('name')
new_field.model = Author
with connection.schema_editor() as editor:
editor.alter_field(Author, old_field, new_field, strict=True)
self.assertEqual(
self.get_column_collation(Author._meta.db_table, 'name'),
collation,
)
with connection.schema_editor() as editor:
editor.alter_field(Author, new_field, old_field, strict=True)
self.assertIsNone(self.get_column_collation(Author._meta.db_table, 'name'))
@skipUnlessDBFeature('supports_collation_on_charfield')
def test_alter_field_type_and_db_collation(self):
with connection.schema_editor() as editor:
editor.create_model(Note)
collation = connection.features.test_collations['non_default']
old_field = Note._meta.get_field('info')
new_field = CharField(max_length=255, db_collation=collation)
new_field.set_attributes_from_name('info')
new_field.model = Note
with connection.schema_editor() as editor:
editor.alter_field(Note, old_field, new_field, strict=True)
columns = self.column_classes(Note)
self.assertEqual(
columns['info'][0],
connection.features.introspected_field_types['CharField'],
)
self.assertEqual(columns['info'][1][8], collation)
with connection.schema_editor() as editor:
editor.alter_field(Note, new_field, old_field, strict=True)
columns = self.column_classes(Note)
self.assertEqual(columns['info'][0], 'TextField')
self.assertIsNone(columns['info'][1][8])
@skipUnlessDBFeature(
'supports_collation_on_charfield',
'supports_non_deterministic_collations',
)
def test_ci_cs_db_collation(self):
cs_collation = connection.features.test_collations.get('cs')
ci_collation = connection.features.test_collations.get('ci')
try:
if connection.vendor == 'mysql':
cs_collation = 'latin1_general_cs'
elif connection.vendor == 'postgresql':
cs_collation = 'en-x-icu'
with connection.cursor() as cursor:
cursor.execute(
"CREATE COLLATION IF NOT EXISTS case_insensitive "
"(provider = icu, locale = 'und-u-ks-level2', "
"deterministic = false)"
)
ci_collation = 'case_insensitive'
# Create the table.
with connection.schema_editor() as editor:
editor.create_model(Author)
# Case-insensitive collation.
old_field = Author._meta.get_field('name')
new_field_ci = CharField(max_length=255, db_collation=ci_collation)
new_field_ci.set_attributes_from_name('name')
new_field_ci.model = Author
with connection.schema_editor() as editor:
editor.alter_field(Author, old_field, new_field_ci, strict=True)
Author.objects.create(name='ANDREW')
self.assertIs(Author.objects.filter(name='Andrew').exists(), True)
# Case-sensitive collation.
new_field_cs = CharField(max_length=255, db_collation=cs_collation)
new_field_cs.set_attributes_from_name('name')
new_field_cs.model = Author
with connection.schema_editor() as editor:
editor.alter_field(Author, new_field_ci, new_field_cs, strict=True)
self.assertIs(Author.objects.filter(name='Andrew').exists(), False)
finally:
if connection.vendor == 'postgresql':
with connection.cursor() as cursor:
cursor.execute('DROP COLLATION IF EXISTS case_insensitive')